Header And Logo

PostgreSQL
| The world's most advanced open source database.

walsender.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * walsender.c
00004  *
00005  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
00006  * care of sending XLOG from the primary server to a single recipient.
00007  * (Note that there can be more than one walsender process concurrently.)
00008  * It is started by the postmaster when the walreceiver of a standby server
00009  * connects to the primary server and requests XLOG streaming replication.
00010  *
00011  * A walsender is similar to a regular backend, ie. there is a one-to-one
00012  * relationship between a connection and a walsender process, but instead
00013  * of processing SQL queries, it understands a small set of special
00014  * replication-mode commands. The START_REPLICATION command begins streaming
00015  * WAL to the client. While streaming, the walsender keeps reading XLOG
00016  * records from the disk and sends them to the standby server over the
00017  * COPY protocol, until the either side ends the replication by exiting COPY
00018  * mode (or until the connection is closed).
00019  *
00020  * Normal termination is by SIGTERM, which instructs the walsender to
00021  * close the connection and exit(0) at next convenient moment. Emergency
00022  * termination is by SIGQUIT; like any backend, the walsender will simply
00023  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
00024  * are treated as not a crash but approximately normal termination;
00025  * the walsender will exit quickly without sending any more XLOG records.
00026  *
00027  * If the server is shut down, postmaster sends us SIGUSR2 after all
00028  * regular backends have exited and the shutdown checkpoint has been written.
00029  * This instruct walsender to send any outstanding WAL, including the
00030  * shutdown checkpoint record, and then exit.
00031  *
00032  *
00033  * Portions Copyright (c) 2010-2013, PostgreSQL Global Development Group
00034  *
00035  * IDENTIFICATION
00036  *    src/backend/replication/walsender.c
00037  *
00038  *-------------------------------------------------------------------------
00039  */
00040 #include "postgres.h"
00041 
00042 #include <signal.h>
00043 #include <unistd.h>
00044 
00045 #include "access/timeline.h"
00046 #include "access/transam.h"
00047 #include "access/xlog_internal.h"
00048 #include "catalog/pg_type.h"
00049 #include "funcapi.h"
00050 #include "libpq/libpq.h"
00051 #include "libpq/pqformat.h"
00052 #include "miscadmin.h"
00053 #include "nodes/replnodes.h"
00054 #include "replication/basebackup.h"
00055 #include "replication/syncrep.h"
00056 #include "replication/walreceiver.h"
00057 #include "replication/walsender.h"
00058 #include "replication/walsender_private.h"
00059 #include "storage/fd.h"
00060 #include "storage/ipc.h"
00061 #include "storage/pmsignal.h"
00062 #include "storage/proc.h"
00063 #include "storage/procarray.h"
00064 #include "tcop/tcopprot.h"
00065 #include "utils/builtins.h"
00066 #include "utils/guc.h"
00067 #include "utils/memutils.h"
00068 #include "utils/ps_status.h"
00069 #include "utils/resowner.h"
00070 #include "utils/timeout.h"
00071 #include "utils/timestamp.h"
00072 
00073 /*
00074  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
00075  *
00076  * We don't have a good idea of what a good value would be; there's some
00077  * overhead per message in both walsender and walreceiver, but on the other
00078  * hand sending large batches makes walsender less responsive to signals
00079  * because signals are checked only between messages.  128kB (with
00080  * default 8k blocks) seems like a reasonable guess for now.
00081  */
00082 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
00083 
00084 /* Array of WalSnds in shared memory */
00085 WalSndCtlData *WalSndCtl = NULL;
00086 
00087 /* My slot in the shared memory array */
00088 WalSnd     *MyWalSnd = NULL;
00089 
00090 /* Global state */
00091 bool        am_walsender = false;       /* Am I a walsender process ? */
00092 bool        am_cascading_walsender = false;     /* Am I cascading WAL to
00093                                                  * another standby ? */
00094 
00095 /* User-settable parameters for walsender */
00096 int         max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
00097 int         wal_sender_timeout = 60 * 1000; /* maximum time to send one
00098                                                  * WAL data message */
00099 /*
00100  * State for WalSndWakeupRequest
00101  */
00102 bool wake_wal_senders = false;
00103 
00104 /*
00105  * These variables are used similarly to openLogFile/Id/Seg/Off,
00106  * but for walsender to read the XLOG.
00107  */
00108 static int  sendFile = -1;
00109 static XLogSegNo sendSegNo = 0;
00110 static uint32 sendOff = 0;
00111 
00112 /* Timeline ID of the currently open file */
00113 static TimeLineID   curFileTimeLine = 0;
00114 
00115 /*
00116  * These variables keep track of the state of the timeline we're currently
00117  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
00118  * the timeline is not the latest timeline on this server, and the server's
00119  * history forked off from that timeline at sendTimeLineValidUpto.
00120  */
00121 static TimeLineID   sendTimeLine = 0;
00122 static TimeLineID   sendTimeLineNextTLI = 0;
00123 static bool         sendTimeLineIsHistoric = false;
00124 static XLogRecPtr   sendTimeLineValidUpto = InvalidXLogRecPtr;
00125 
00126 /*
00127  * How far have we sent WAL already? This is also advertised in
00128  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
00129  */
00130 static XLogRecPtr sentPtr = 0;
00131 
00132 /* Buffers for constructing outgoing messages and processing reply messages. */
00133 static StringInfoData output_message;
00134 static StringInfoData reply_message;
00135 static StringInfoData tmpbuf;
00136 
00137 /*
00138  * Timestamp of the last receipt of the reply from the standby.
00139  */
00140 static TimestampTz last_reply_timestamp;
00141 /* Have we sent a heartbeat message asking for reply, since last reply? */
00142 static bool ping_sent = false;
00143 
00144 /*
00145  * While streaming WAL in Copy mode, streamingDoneSending is set to true
00146  * after we have sent CopyDone. We should not send any more CopyData messages
00147  * after that. streamingDoneReceiving is set to true when we receive CopyDone
00148  * from the other end. When both become true, it's time to exit Copy mode.
00149  */
00150 static bool streamingDoneSending;
00151 static bool streamingDoneReceiving;
00152 
00153 /* Flags set by signal handlers for later service in main loop */
00154 static volatile sig_atomic_t got_SIGHUP = false;
00155 static volatile sig_atomic_t walsender_ready_to_stop = false;
00156 
00157 /*
00158  * This is set while we are streaming. When not set, SIGUSR2 signal will be
00159  * handled like SIGTERM. When set, the main loop is responsible for checking
00160  * walsender_ready_to_stop and terminating when it's set (after streaming any
00161  * remaining WAL).
00162  */
00163 static volatile sig_atomic_t replication_active = false;
00164 
00165 /* Signal handlers */
00166 static void WalSndSigHupHandler(SIGNAL_ARGS);
00167 static void WalSndXLogSendHandler(SIGNAL_ARGS);
00168 static void WalSndLastCycleHandler(SIGNAL_ARGS);
00169 
00170 /* Prototypes for private functions */
00171 static void WalSndLoop(void);
00172 static void InitWalSenderSlot(void);
00173 static void WalSndKill(int code, Datum arg);
00174 static void XLogSend(bool *caughtup);
00175 static XLogRecPtr GetStandbyFlushRecPtr(void);
00176 static void IdentifySystem(void);
00177 static void StartReplication(StartReplicationCmd *cmd);
00178 static void ProcessStandbyMessage(void);
00179 static void ProcessStandbyReplyMessage(void);
00180 static void ProcessStandbyHSFeedbackMessage(void);
00181 static void ProcessRepliesIfAny(void);
00182 static void WalSndKeepalive(bool requestReply);
00183 
00184 
00185 /* Initialize walsender process before entering the main command loop */
00186 void
00187 InitWalSender(void)
00188 {
00189     am_cascading_walsender = RecoveryInProgress();
00190 
00191     /* Create a per-walsender data structure in shared memory */
00192     InitWalSenderSlot();
00193 
00194     /* Set up resource owner */
00195     CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
00196 
00197     /*
00198      * Let postmaster know that we're a WAL sender. Once we've declared us as
00199      * a WAL sender process, postmaster will let us outlive the bgwriter and
00200      * kill us last in the shutdown sequence, so we get a chance to stream all
00201      * remaining WAL at shutdown, including the shutdown checkpoint. Note that
00202      * there's no going back, and we mustn't write any WAL records after this.
00203      */
00204     MarkPostmasterChildWalSender();
00205     SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
00206 }
00207 
00208 /*
00209  * Clean up after an error.
00210  *
00211  * WAL sender processes don't use transactions like regular backends do.
00212  * This function does any cleanup requited after an error in a WAL sender
00213  * process, similar to what transaction abort does in a regular backend.
00214  */
00215 void
00216 WalSndErrorCleanup()
00217 {
00218     if (sendFile >= 0)
00219     {
00220         close(sendFile);
00221         sendFile = -1;
00222     }
00223 
00224     replication_active = false;
00225     if (walsender_ready_to_stop)
00226         proc_exit(0);
00227 
00228     /* Revert back to startup state */
00229     WalSndSetState(WALSNDSTATE_STARTUP);
00230 }
00231 
00232 /*
00233  * Handle the IDENTIFY_SYSTEM command.
00234  */
00235 static void
00236 IdentifySystem(void)
00237 {
00238     StringInfoData buf;
00239     char        sysid[32];
00240     char        tli[11];
00241     char        xpos[MAXFNAMELEN];
00242     XLogRecPtr  logptr;
00243 
00244     /*
00245      * Reply with a result set with one row, three columns. First col is
00246      * system ID, second is timeline ID, and third is current xlog location.
00247      */
00248 
00249     snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
00250              GetSystemIdentifier());
00251 
00252     am_cascading_walsender = RecoveryInProgress();
00253     if (am_cascading_walsender)
00254     {
00255         /* this also updates ThisTimeLineID */
00256         logptr = GetStandbyFlushRecPtr();
00257     }
00258     else
00259         logptr = GetInsertRecPtr();
00260 
00261     snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
00262 
00263     snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
00264 
00265     /* Send a RowDescription message */
00266     pq_beginmessage(&buf, 'T');
00267     pq_sendint(&buf, 3, 2);     /* 3 fields */
00268 
00269     /* first field */
00270     pq_sendstring(&buf, "systemid");    /* col name */
00271     pq_sendint(&buf, 0, 4);     /* table oid */
00272     pq_sendint(&buf, 0, 2);     /* attnum */
00273     pq_sendint(&buf, TEXTOID, 4);       /* type oid */
00274     pq_sendint(&buf, -1, 2);    /* typlen */
00275     pq_sendint(&buf, 0, 4);     /* typmod */
00276     pq_sendint(&buf, 0, 2);     /* format code */
00277 
00278     /* second field */
00279     pq_sendstring(&buf, "timeline");    /* col name */
00280     pq_sendint(&buf, 0, 4);     /* table oid */
00281     pq_sendint(&buf, 0, 2);     /* attnum */
00282     pq_sendint(&buf, INT4OID, 4);       /* type oid */
00283     pq_sendint(&buf, 4, 2);     /* typlen */
00284     pq_sendint(&buf, 0, 4);     /* typmod */
00285     pq_sendint(&buf, 0, 2);     /* format code */
00286 
00287     /* third field */
00288     pq_sendstring(&buf, "xlogpos");
00289     pq_sendint(&buf, 0, 4);
00290     pq_sendint(&buf, 0, 2);
00291     pq_sendint(&buf, TEXTOID, 4);
00292     pq_sendint(&buf, -1, 2);
00293     pq_sendint(&buf, 0, 4);
00294     pq_sendint(&buf, 0, 2);
00295     pq_endmessage(&buf);
00296 
00297     /* Send a DataRow message */
00298     pq_beginmessage(&buf, 'D');
00299     pq_sendint(&buf, 3, 2);     /* # of columns */
00300     pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
00301     pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
00302     pq_sendint(&buf, strlen(tli), 4);   /* col2 len */
00303     pq_sendbytes(&buf, (char *) tli, strlen(tli));
00304     pq_sendint(&buf, strlen(xpos), 4);  /* col3 len */
00305     pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
00306 
00307     pq_endmessage(&buf);
00308 }
00309 
00310 
00311 /*
00312  * Handle TIMELINE_HISTORY command.
00313  */
00314 static void
00315 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
00316 {
00317     StringInfoData buf;
00318     char        histfname[MAXFNAMELEN];
00319     char        path[MAXPGPATH];
00320     int         fd;
00321     off_t       histfilelen;
00322     off_t       bytesleft;
00323 
00324     /*
00325      * Reply with a result set with one row, and two columns. The first col
00326      * is the name of the history file, 2nd is the contents.
00327      */
00328 
00329     TLHistoryFileName(histfname, cmd->timeline);
00330     TLHistoryFilePath(path, cmd->timeline);
00331 
00332     /* Send a RowDescription message */
00333     pq_beginmessage(&buf, 'T');
00334     pq_sendint(&buf, 2, 2);     /* 2 fields */
00335 
00336     /* first field */
00337     pq_sendstring(&buf, "filename");    /* col name */
00338     pq_sendint(&buf, 0, 4);     /* table oid */
00339     pq_sendint(&buf, 0, 2);     /* attnum */
00340     pq_sendint(&buf, TEXTOID, 4);       /* type oid */
00341     pq_sendint(&buf, -1, 2);    /* typlen */
00342     pq_sendint(&buf, 0, 4);     /* typmod */
00343     pq_sendint(&buf, 0, 2);     /* format code */
00344 
00345     /* second field */
00346     pq_sendstring(&buf, "content"); /* col name */
00347     pq_sendint(&buf, 0, 4);     /* table oid */
00348     pq_sendint(&buf, 0, 2);     /* attnum */
00349     pq_sendint(&buf, BYTEAOID, 4);      /* type oid */
00350     pq_sendint(&buf, -1, 2);    /* typlen */
00351     pq_sendint(&buf, 0, 4);     /* typmod */
00352     pq_sendint(&buf, 0, 2);     /* format code */
00353     pq_endmessage(&buf);
00354 
00355     /* Send a DataRow message */
00356     pq_beginmessage(&buf, 'D');
00357     pq_sendint(&buf, 2, 2);     /* # of columns */
00358     pq_sendint(&buf, strlen(histfname), 4); /* col1 len */
00359     pq_sendbytes(&buf, histfname, strlen(histfname));
00360 
00361     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
00362     if (fd < 0)
00363         ereport(ERROR,
00364                 (errcode_for_file_access(),
00365                  errmsg("could not open file \"%s\": %m", path)));
00366 
00367     /* Determine file length and send it to client */
00368     histfilelen = lseek(fd, 0, SEEK_END);
00369     if (histfilelen < 0)
00370         ereport(ERROR,
00371                 (errcode_for_file_access(),
00372                  errmsg("could not seek to end of file \"%s\": %m", path)));
00373     if (lseek(fd, 0, SEEK_SET) != 0)
00374         ereport(ERROR,
00375                 (errcode_for_file_access(),
00376                  errmsg("could not seek to beginning of file \"%s\": %m", path)));
00377 
00378     pq_sendint(&buf, histfilelen, 4);   /* col2 len */
00379 
00380     bytesleft = histfilelen;
00381     while (bytesleft > 0)
00382     {
00383         char rbuf[BLCKSZ];
00384         int nread;
00385 
00386         nread = read(fd, rbuf, sizeof(rbuf));
00387         if (nread <= 0)
00388             ereport(ERROR,
00389                     (errcode_for_file_access(),
00390                      errmsg("could not read file \"%s\": %m",
00391                             path)));
00392         pq_sendbytes(&buf, rbuf, nread);
00393         bytesleft -= nread;
00394     }
00395     CloseTransientFile(fd);
00396 
00397     pq_endmessage(&buf);
00398 }
00399 
00400 /*
00401  * Handle START_REPLICATION command.
00402  *
00403  * At the moment, this never returns, but an ereport(ERROR) will take us back
00404  * to the main loop.
00405  */
00406 static void
00407 StartReplication(StartReplicationCmd *cmd)
00408 {
00409     StringInfoData buf;
00410     XLogRecPtr FlushPtr;
00411 
00412     /*
00413      * We assume here that we're logging enough information in the WAL for
00414      * log-shipping, since this is checked in PostmasterMain().
00415      *
00416      * NOTE: wal_level can only change at shutdown, so in most cases it is
00417      * difficult for there to be WAL data that we can still see that was
00418      * written at wal_level='minimal'.
00419      */
00420 
00421     /*
00422      * Select the timeline. If it was given explicitly by the client, use
00423      * that. Otherwise use the timeline of the last replayed record, which
00424      * is kept in ThisTimeLineID.
00425      */
00426     if (am_cascading_walsender)
00427     {
00428         /* this also updates ThisTimeLineID */
00429         FlushPtr = GetStandbyFlushRecPtr();
00430     }
00431     else
00432         FlushPtr = GetFlushRecPtr();
00433 
00434     if (cmd->timeline != 0)
00435     {
00436         XLogRecPtr  switchpoint;
00437 
00438         sendTimeLine = cmd->timeline;
00439         if (sendTimeLine == ThisTimeLineID)
00440         {
00441             sendTimeLineIsHistoric = false;
00442             sendTimeLineValidUpto = InvalidXLogRecPtr;
00443         }
00444         else
00445         {
00446             List       *timeLineHistory;
00447 
00448             sendTimeLineIsHistoric = true;
00449 
00450             /*
00451              * Check that the timeline the client requested for exists, and the
00452              * requested start location is on that timeline.
00453              */
00454             timeLineHistory = readTimeLineHistory(ThisTimeLineID);
00455             switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
00456                                          &sendTimeLineNextTLI);
00457             list_free_deep(timeLineHistory);
00458 
00459             /*
00460              * Found the requested timeline in the history. Check that
00461              * requested startpoint is on that timeline in our history.
00462              *
00463              * This is quite loose on purpose. We only check that we didn't
00464              * fork off the requested timeline before the switchpoint. We don't
00465              * check that we switched *to* it before the requested starting
00466              * point. This is because the client can legitimately request to
00467              * start replication from the beginning of the WAL segment that
00468              * contains switchpoint, but on the new timeline, so that it
00469              * doesn't end up with a partial segment. If you ask for a too old
00470              * starting point, you'll get an error later when we fail to find
00471              * the requested WAL segment in pg_xlog.
00472              *
00473              * XXX: we could be more strict here and only allow a startpoint
00474              * that's older than the switchpoint, if it it's still in the same
00475              * WAL segment.
00476              */
00477             if (!XLogRecPtrIsInvalid(switchpoint) &&
00478                 switchpoint < cmd->startpoint)
00479             {
00480                 ereport(ERROR,
00481                         (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
00482                                 (uint32) (cmd->startpoint >> 32),
00483                                 (uint32) (cmd->startpoint),
00484                                 cmd->timeline),
00485                          errdetail("This server's history forked from timeline %u at %X/%X",
00486                                    cmd->timeline,
00487                                    (uint32) (switchpoint >> 32),
00488                                    (uint32) (switchpoint))));
00489             }
00490             sendTimeLineValidUpto = switchpoint;
00491         }
00492     }
00493     else
00494     {
00495         sendTimeLine = ThisTimeLineID;
00496         sendTimeLineValidUpto = InvalidXLogRecPtr;
00497         sendTimeLineIsHistoric = false;
00498     }
00499 
00500     streamingDoneSending = streamingDoneReceiving = false;
00501 
00502     /* If there is nothing to stream, don't even enter COPY mode */
00503     if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
00504     {
00505         /*
00506          * When we first start replication the standby will be behind the primary.
00507          * For some applications, for example, synchronous replication, it is
00508          * important to have a clear state for this initial catchup mode, so we
00509          * can trigger actions when we change streaming state later. We may stay
00510          * in this state for a long time, which is exactly why we want to be able
00511          * to monitor whether or not we are still here.
00512          */
00513         WalSndSetState(WALSNDSTATE_CATCHUP);
00514 
00515         /* Send a CopyBothResponse message, and start streaming */
00516         pq_beginmessage(&buf, 'W');
00517         pq_sendbyte(&buf, 0);
00518         pq_sendint(&buf, 0, 2);
00519         pq_endmessage(&buf);
00520         pq_flush();
00521 
00522         /*
00523          * Don't allow a request to stream from a future point in WAL that
00524          * hasn't been flushed to disk in this server yet.
00525          */
00526         if (FlushPtr < cmd->startpoint)
00527         {
00528             ereport(ERROR,
00529                     (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
00530                             (uint32) (cmd->startpoint >> 32),
00531                             (uint32) (cmd->startpoint),
00532                             (uint32) (FlushPtr >> 32),
00533                             (uint32) (FlushPtr))));
00534         }
00535 
00536         /* Start streaming from the requested point */
00537         sentPtr = cmd->startpoint;
00538 
00539         /* Initialize shared memory status, too */
00540         {
00541             /* use volatile pointer to prevent code rearrangement */
00542             volatile WalSnd *walsnd = MyWalSnd;
00543 
00544             SpinLockAcquire(&walsnd->mutex);
00545             walsnd->sentPtr = sentPtr;
00546             SpinLockRelease(&walsnd->mutex);
00547         }
00548 
00549         SyncRepInitConfig();
00550 
00551         /* Main loop of walsender */
00552         replication_active = true;
00553 
00554         WalSndLoop();
00555 
00556         replication_active = false;
00557         if (walsender_ready_to_stop)
00558             proc_exit(0);
00559         WalSndSetState(WALSNDSTATE_STARTUP);
00560 
00561         Assert(streamingDoneSending && streamingDoneReceiving);
00562     }
00563 
00564     /*
00565      * Copy is finished now. Send a single-row result set indicating the next
00566      * timeline.
00567      */
00568     if (sendTimeLineIsHistoric)
00569     {
00570         char        str[11];
00571         snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);
00572 
00573         pq_beginmessage(&buf, 'T'); /* RowDescription */
00574         pq_sendint(&buf, 1, 2);     /* 1 field */
00575 
00576         /* Field header */
00577         pq_sendstring(&buf, "next_tli");
00578         pq_sendint(&buf, 0, 4);     /* table oid */
00579         pq_sendint(&buf, 0, 2);     /* attnum */
00580         /*
00581          * int8 may seem like a surprising data type for this, but in theory
00582          * int4 would not be wide enough for this, as TimeLineID is unsigned.
00583          */
00584         pq_sendint(&buf, INT8OID, 4);   /* type oid */
00585         pq_sendint(&buf, -1, 2);
00586         pq_sendint(&buf, 0, 4);
00587         pq_sendint(&buf, 0, 2);
00588         pq_endmessage(&buf);
00589 
00590         /* Data row */
00591         pq_beginmessage(&buf, 'D');
00592         pq_sendint(&buf, 1, 2);     /* number of columns */
00593         pq_sendint(&buf, strlen(str), 4);   /* length */
00594         pq_sendbytes(&buf, str, strlen(str));
00595         pq_endmessage(&buf);
00596     }
00597 
00598     /* Send CommandComplete message */
00599     pq_puttextmessage('C', "START_STREAMING");
00600 }
00601 
00602 /*
00603  * Execute an incoming replication command.
00604  */
00605 void
00606 exec_replication_command(const char *cmd_string)
00607 {
00608     int         parse_rc;
00609     Node       *cmd_node;
00610     MemoryContext cmd_context;
00611     MemoryContext old_context;
00612 
00613     elog(DEBUG1, "received replication command: %s", cmd_string);
00614 
00615     CHECK_FOR_INTERRUPTS();
00616 
00617     cmd_context = AllocSetContextCreate(CurrentMemoryContext,
00618                                         "Replication command context",
00619                                         ALLOCSET_DEFAULT_MINSIZE,
00620                                         ALLOCSET_DEFAULT_INITSIZE,
00621                                         ALLOCSET_DEFAULT_MAXSIZE);
00622     old_context = MemoryContextSwitchTo(cmd_context);
00623 
00624     replication_scanner_init(cmd_string);
00625     parse_rc = replication_yyparse();
00626     if (parse_rc != 0)
00627         ereport(ERROR,
00628                 (errcode(ERRCODE_SYNTAX_ERROR),
00629                  (errmsg_internal("replication command parser returned %d",
00630                                   parse_rc))));
00631 
00632     cmd_node = replication_parse_result;
00633 
00634     switch (cmd_node->type)
00635     {
00636         case T_IdentifySystemCmd:
00637             IdentifySystem();
00638             break;
00639 
00640         case T_StartReplicationCmd:
00641             StartReplication((StartReplicationCmd *) cmd_node);
00642             break;
00643 
00644         case T_BaseBackupCmd:
00645             SendBaseBackup((BaseBackupCmd *) cmd_node);
00646             break;
00647 
00648         case T_TimeLineHistoryCmd:
00649             SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
00650             break;
00651 
00652         default:
00653             elog(ERROR, "unrecognized replication command node tag: %u",
00654                  cmd_node->type);
00655     }
00656 
00657     /* done */
00658     MemoryContextSwitchTo(old_context);
00659     MemoryContextDelete(cmd_context);
00660 
00661     /* Send CommandComplete message */
00662     EndCommand("SELECT", DestRemote);
00663 }
00664 
00665 /*
00666  * Process any incoming messages while streaming. Also checks if the remote
00667  * end has closed the connection.
00668  */
00669 static void
00670 ProcessRepliesIfAny(void)
00671 {
00672     unsigned char firstchar;
00673     int         r;
00674     bool        received = false;
00675 
00676     for (;;)
00677     {
00678         r = pq_getbyte_if_available(&firstchar);
00679         if (r < 0)
00680         {
00681             /* unexpected error or EOF */
00682             ereport(COMMERROR,
00683                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
00684                      errmsg("unexpected EOF on standby connection")));
00685             proc_exit(0);
00686         }
00687         if (r == 0)
00688         {
00689             /* no data available without blocking */
00690             break;
00691         }
00692 
00693         /*
00694          * If we already received a CopyDone from the frontend, the frontend
00695          * should not send us anything until we've closed our end of the COPY.
00696          * XXX: In theory, the frontend could already send the next command
00697          * before receiving the CopyDone, but libpq doesn't currently allow
00698          * that.
00699          */
00700         if (streamingDoneReceiving && firstchar != 'X')
00701             ereport(FATAL,
00702                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
00703                      errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
00704                             firstchar)));
00705 
00706         /* Handle the very limited subset of commands expected in this phase */
00707         switch (firstchar)
00708         {
00709                 /*
00710                  * 'd' means a standby reply wrapped in a CopyData packet.
00711                  */
00712             case 'd':
00713                 ProcessStandbyMessage();
00714                 received = true;
00715                 break;
00716 
00717                 /*
00718                  * CopyDone means the standby requested to finish streaming.
00719                  * Reply with CopyDone, if we had not sent that already.
00720                  */
00721             case 'c':
00722                 if (!streamingDoneSending)
00723                 {
00724                     pq_putmessage_noblock('c', NULL, 0);
00725                     streamingDoneSending = true;
00726                 }
00727 
00728                 /* consume the CopyData message */
00729                 resetStringInfo(&reply_message);
00730                 if (pq_getmessage(&reply_message, 0))
00731                 {
00732                     ereport(COMMERROR,
00733                             (errcode(ERRCODE_PROTOCOL_VIOLATION),
00734                              errmsg("unexpected EOF on standby connection")));
00735                     proc_exit(0);
00736                 }
00737 
00738                 streamingDoneReceiving = true;
00739                 received = true;
00740                 break;
00741 
00742                 /*
00743                  * 'X' means that the standby is closing down the socket.
00744                  */
00745             case 'X':
00746                 proc_exit(0);
00747 
00748             default:
00749                 ereport(FATAL,
00750                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
00751                          errmsg("invalid standby message type \"%c\"",
00752                                 firstchar)));
00753         }
00754     }
00755 
00756     /*
00757      * Save the last reply timestamp if we've received at least one reply.
00758      */
00759     if (received)
00760     {
00761         last_reply_timestamp = GetCurrentTimestamp();
00762         ping_sent = false;
00763     }
00764 }
00765 
00766 /*
00767  * Process a status update message received from standby.
00768  */
00769 static void
00770 ProcessStandbyMessage(void)
00771 {
00772     char        msgtype;
00773 
00774     resetStringInfo(&reply_message);
00775 
00776     /*
00777      * Read the message contents.
00778      */
00779     if (pq_getmessage(&reply_message, 0))
00780     {
00781         ereport(COMMERROR,
00782                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
00783                  errmsg("unexpected EOF on standby connection")));
00784         proc_exit(0);
00785     }
00786 
00787     /*
00788      * Check message type from the first byte.
00789      */
00790     msgtype = pq_getmsgbyte(&reply_message);
00791 
00792     switch (msgtype)
00793     {
00794         case 'r':
00795             ProcessStandbyReplyMessage();
00796             break;
00797 
00798         case 'h':
00799             ProcessStandbyHSFeedbackMessage();
00800             break;
00801 
00802         default:
00803             ereport(COMMERROR,
00804                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
00805                      errmsg("unexpected message type \"%c\"", msgtype)));
00806             proc_exit(0);
00807     }
00808 }
00809 
00810 /*
00811  * Regular reply from standby advising of WAL positions on standby server.
00812  */
00813 static void
00814 ProcessStandbyReplyMessage(void)
00815 {
00816     XLogRecPtr  writePtr,
00817                 flushPtr,
00818                 applyPtr;
00819     bool        replyRequested;
00820 
00821     /* the caller already consumed the msgtype byte */
00822     writePtr = pq_getmsgint64(&reply_message);
00823     flushPtr = pq_getmsgint64(&reply_message);
00824     applyPtr = pq_getmsgint64(&reply_message);
00825     (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
00826     replyRequested = pq_getmsgbyte(&reply_message);
00827 
00828     elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
00829          (uint32) (writePtr >> 32), (uint32) writePtr,
00830          (uint32) (flushPtr >> 32), (uint32) flushPtr,
00831          (uint32) (applyPtr >> 32), (uint32) applyPtr,
00832          replyRequested ? " (reply requested)" : "");
00833 
00834     /* Send a reply if the standby requested one. */
00835     if (replyRequested)
00836         WalSndKeepalive(false);
00837 
00838     /*
00839      * Update shared state for this WalSender process based on reply data from
00840      * standby.
00841      */
00842     {
00843         /* use volatile pointer to prevent code rearrangement */
00844         volatile WalSnd *walsnd = MyWalSnd;
00845 
00846         SpinLockAcquire(&walsnd->mutex);
00847         walsnd->write = writePtr;
00848         walsnd->flush = flushPtr;
00849         walsnd->apply = applyPtr;
00850         SpinLockRelease(&walsnd->mutex);
00851     }
00852 
00853     if (!am_cascading_walsender)
00854         SyncRepReleaseWaiters();
00855 }
00856 
00857 /*
00858  * Hot Standby feedback
00859  */
00860 static void
00861 ProcessStandbyHSFeedbackMessage(void)
00862 {
00863     TransactionId nextXid;
00864     uint32      nextEpoch;
00865     TransactionId feedbackXmin;
00866     uint32      feedbackEpoch;
00867 
00868     /*
00869      * Decipher the reply message. The caller already consumed the msgtype
00870      * byte.
00871      */
00872     (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
00873     feedbackXmin = pq_getmsgint(&reply_message, 4);
00874     feedbackEpoch = pq_getmsgint(&reply_message, 4);
00875 
00876     elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
00877          feedbackXmin,
00878          feedbackEpoch);
00879 
00880     /* Unset WalSender's xmin if the feedback message value is invalid */
00881     if (!TransactionIdIsNormal(feedbackXmin))
00882     {
00883         MyPgXact->xmin = InvalidTransactionId;
00884         return;
00885     }
00886 
00887     /*
00888      * Check that the provided xmin/epoch are sane, that is, not in the future
00889      * and not so far back as to be already wrapped around.  Ignore if not.
00890      *
00891      * Epoch of nextXid should be same as standby, or if the counter has
00892      * wrapped, then one greater than standby.
00893      */
00894     GetNextXidAndEpoch(&nextXid, &nextEpoch);
00895 
00896     if (feedbackXmin <= nextXid)
00897     {
00898         if (feedbackEpoch != nextEpoch)
00899             return;
00900     }
00901     else
00902     {
00903         if (feedbackEpoch + 1 != nextEpoch)
00904             return;
00905     }
00906 
00907     if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
00908         return;                 /* epoch OK, but it's wrapped around */
00909 
00910     /*
00911      * Set the WalSender's xmin equal to the standby's requested xmin, so that
00912      * the xmin will be taken into account by GetOldestXmin.  This will hold
00913      * back the removal of dead rows and thereby prevent the generation of
00914      * cleanup conflicts on the standby server.
00915      *
00916      * There is a small window for a race condition here: although we just
00917      * checked that feedbackXmin precedes nextXid, the nextXid could have gotten
00918      * advanced between our fetching it and applying the xmin below, perhaps
00919      * far enough to make feedbackXmin wrap around.  In that case the xmin we
00920      * set here would be "in the future" and have no effect.  No point in
00921      * worrying about this since it's too late to save the desired data
00922      * anyway.  Assuming that the standby sends us an increasing sequence of
00923      * xmins, this could only happen during the first reply cycle, else our
00924      * own xmin would prevent nextXid from advancing so far.
00925      *
00926      * We don't bother taking the ProcArrayLock here.  Setting the xmin field
00927      * is assumed atomic, and there's no real need to prevent a concurrent
00928      * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
00929      * safe, and if we're moving it backwards, well, the data is at risk
00930      * already since a VACUUM could have just finished calling GetOldestXmin.)
00931      */
00932     MyPgXact->xmin = feedbackXmin;
00933 }
00934 
00935 /* Main loop of walsender process that streams the WAL over Copy messages. */
00936 static void
00937 WalSndLoop(void)
00938 {
00939     bool        caughtup = false;
00940 
00941     /*
00942      * Allocate buffers that will be used for each outgoing and incoming
00943      * message.  We do this just once to reduce palloc overhead.
00944      */
00945     initStringInfo(&output_message);
00946     initStringInfo(&reply_message);
00947     initStringInfo(&tmpbuf);
00948 
00949     /* Initialize the last reply timestamp */
00950     last_reply_timestamp = GetCurrentTimestamp();
00951     ping_sent = false;
00952 
00953     /*
00954      * Loop until we reach the end of this timeline or the client requests
00955      * to stop streaming.
00956      */
00957     for (;;)
00958     {
00959         /* Clear any already-pending wakeups */
00960         ResetLatch(&MyWalSnd->latch);
00961 
00962         /*
00963          * Emergency bailout if postmaster has died.  This is to avoid the
00964          * necessity for manual cleanup of all postmaster children.
00965          */
00966         if (!PostmasterIsAlive())
00967             exit(1);
00968 
00969         /* Process any requests or signals received recently */
00970         if (got_SIGHUP)
00971         {
00972             got_SIGHUP = false;
00973             ProcessConfigFile(PGC_SIGHUP);
00974             SyncRepInitConfig();
00975         }
00976 
00977         CHECK_FOR_INTERRUPTS();
00978 
00979         /* Check for input from the client */
00980         ProcessRepliesIfAny();
00981 
00982         /*
00983          * If we have received CopyDone from the client, sent CopyDone
00984          * ourselves, and the output buffer is empty, it's time to exit
00985          * streaming.
00986          */
00987         if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
00988             break;
00989 
00990         /*
00991          * If we don't have any pending data in the output buffer, try to send
00992          * some more.  If there is some, we don't bother to call XLogSend
00993          * again until we've flushed it ... but we'd better assume we are not
00994          * caught up.
00995          */
00996         if (!pq_is_send_pending())
00997             XLogSend(&caughtup);
00998         else
00999             caughtup = false;
01000 
01001         /* Try to flush pending output to the client */
01002         if (pq_flush_if_writable() != 0)
01003             goto send_failure;
01004 
01005         /* If nothing remains to be sent right now ... */
01006         if (caughtup && !pq_is_send_pending())
01007         {
01008             /*
01009              * If we're in catchup state, move to streaming.  This is an
01010              * important state change for users to know about, since before
01011              * this point data loss might occur if the primary dies and we
01012              * need to failover to the standby. The state change is also
01013              * important for synchronous replication, since commits that
01014              * started to wait at that point might wait for some time.
01015              */
01016             if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
01017             {
01018                 ereport(DEBUG1,
01019                      (errmsg("standby \"%s\" has now caught up with primary",
01020                              application_name)));
01021                 WalSndSetState(WALSNDSTATE_STREAMING);
01022             }
01023 
01024             /*
01025              * When SIGUSR2 arrives, we send any outstanding logs up to the
01026              * shutdown checkpoint record (i.e., the latest record) and exit.
01027              * This may be a normal termination at shutdown, or a promotion,
01028              * the walsender is not sure which.
01029              */
01030             if (walsender_ready_to_stop)
01031             {
01032                 /* ... let's just be real sure we're caught up ... */
01033                 XLogSend(&caughtup);
01034                 if (caughtup && !pq_is_send_pending())
01035                 {
01036                     /* Inform the standby that XLOG streaming is done */
01037                     EndCommand("COPY 0", DestRemote);
01038                     pq_flush();
01039 
01040                     proc_exit(0);
01041                 }
01042             }
01043         }
01044 
01045         /*
01046          * We don't block if not caught up, unless there is unsent data
01047          * pending in which case we'd better block until the socket is
01048          * write-ready.  This test is only needed for the case where XLogSend
01049          * loaded a subset of the available data but then pq_flush_if_writable
01050          * flushed it all --- we should immediately try to send more.
01051          */
01052         if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
01053         {
01054             TimestampTz timeout = 0;
01055             long        sleeptime = 10000;      /* 10 s */
01056             int         wakeEvents;
01057 
01058             wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
01059                 WL_SOCKET_READABLE;
01060 
01061             if (pq_is_send_pending())
01062                 wakeEvents |= WL_SOCKET_WRITEABLE;
01063             else if (wal_sender_timeout > 0 && !ping_sent)
01064             {
01065                 /*
01066                  * If half of wal_sender_timeout has lapsed without receiving
01067                  * any reply from standby, send a keep-alive message to standby
01068                  * requesting an immediate reply.
01069                  */
01070                 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
01071                                                       wal_sender_timeout / 2);
01072                 if (GetCurrentTimestamp() >= timeout)
01073                 {
01074                     WalSndKeepalive(true);
01075                     ping_sent = true;
01076                     /* Try to flush pending output to the client */
01077                     if (pq_flush_if_writable() != 0)
01078                         break;
01079                 }
01080             }
01081 
01082             /* Determine time until replication timeout */
01083             if (wal_sender_timeout > 0)
01084             {
01085                 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
01086                                                       wal_sender_timeout);
01087                 sleeptime = 1 + (wal_sender_timeout / 10);
01088             }
01089 
01090             /* Sleep until something happens or we time out */
01091             ImmediateInterruptOK = true;
01092             CHECK_FOR_INTERRUPTS();
01093             WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
01094                               MyProcPort->sock, sleeptime);
01095             ImmediateInterruptOK = false;
01096 
01097             /*
01098              * Check for replication timeout.  Note we ignore the corner case
01099              * possibility that the client replied just as we reached the
01100              * timeout ... he's supposed to reply *before* that.
01101              */
01102             if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
01103             {
01104                 /*
01105                  * Since typically expiration of replication timeout means
01106                  * communication problem, we don't send the error message to
01107                  * the standby.
01108                  */
01109                 ereport(COMMERROR,
01110                         (errmsg("terminating walsender process due to replication timeout")));
01111                 goto send_failure;
01112             }
01113         }
01114     }
01115     return;
01116 
01117 send_failure:
01118     /*
01119      * Get here on send failure.  Clean up and exit.
01120      *
01121      * Reset whereToSendOutput to prevent ereport from attempting to send any
01122      * more messages to the standby.
01123      */
01124     if (whereToSendOutput == DestRemote)
01125         whereToSendOutput = DestNone;
01126 
01127     proc_exit(0);
01128     abort();                    /* keep the compiler quiet */
01129 }
01130 
01131 /* Initialize a per-walsender data structure for this walsender process */
01132 static void
01133 InitWalSenderSlot(void)
01134 {
01135     int         i;
01136 
01137     /*
01138      * WalSndCtl should be set up already (we inherit this by fork() or
01139      * EXEC_BACKEND mechanism from the postmaster).
01140      */
01141     Assert(WalSndCtl != NULL);
01142     Assert(MyWalSnd == NULL);
01143 
01144     /*
01145      * Find a free walsender slot and reserve it. If this fails, we must be
01146      * out of WalSnd structures.
01147      */
01148     for (i = 0; i < max_wal_senders; i++)
01149     {
01150         /* use volatile pointer to prevent code rearrangement */
01151         volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
01152 
01153         SpinLockAcquire(&walsnd->mutex);
01154 
01155         if (walsnd->pid != 0)
01156         {
01157             SpinLockRelease(&walsnd->mutex);
01158             continue;
01159         }
01160         else
01161         {
01162             /*
01163              * Found a free slot. Reserve it for us.
01164              */
01165             walsnd->pid = MyProcPid;
01166             walsnd->sentPtr = InvalidXLogRecPtr;
01167             walsnd->state = WALSNDSTATE_STARTUP;
01168             SpinLockRelease(&walsnd->mutex);
01169             /* don't need the lock anymore */
01170             OwnLatch((Latch *) &walsnd->latch);
01171             MyWalSnd = (WalSnd *) walsnd;
01172 
01173             break;
01174         }
01175     }
01176     if (MyWalSnd == NULL)
01177         ereport(FATAL,
01178                 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
01179                  errmsg("number of requested standby connections "
01180                         "exceeds max_wal_senders (currently %d)",
01181                         max_wal_senders)));
01182 
01183     /* Arrange to clean up at walsender exit */
01184     on_shmem_exit(WalSndKill, 0);
01185 }
01186 
01187 /* Destroy the per-walsender data structure for this walsender process */
01188 static void
01189 WalSndKill(int code, Datum arg)
01190 {
01191     Assert(MyWalSnd != NULL);
01192 
01193     /*
01194      * Mark WalSnd struct no longer in use. Assume that no lock is required
01195      * for this.
01196      */
01197     MyWalSnd->pid = 0;
01198     DisownLatch(&MyWalSnd->latch);
01199 
01200     /* WalSnd struct isn't mine anymore */
01201     MyWalSnd = NULL;
01202 }
01203 
01204 /*
01205  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
01206  *
01207  * XXX probably this should be improved to suck data directly from the
01208  * WAL buffers when possible.
01209  *
01210  * Will open, and keep open, one WAL segment stored in the global file
01211  * descriptor sendFile. This means if XLogRead is used once, there will
01212  * always be one descriptor left open until the process ends, but never
01213  * more than one.
01214  */
01215 static void
01216 XLogRead(char *buf, XLogRecPtr startptr, Size count)
01217 {
01218     char       *p;
01219     XLogRecPtr  recptr;
01220     Size        nbytes;
01221     XLogSegNo   segno;
01222 
01223 retry:
01224     p = buf;
01225     recptr = startptr;
01226     nbytes = count;
01227 
01228     while (nbytes > 0)
01229     {
01230         uint32      startoff;
01231         int         segbytes;
01232         int         readbytes;
01233 
01234         startoff = recptr % XLogSegSize;
01235 
01236         if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
01237         {
01238             char        path[MAXPGPATH];
01239 
01240             /* Switch to another logfile segment */
01241             if (sendFile >= 0)
01242                 close(sendFile);
01243 
01244             XLByteToSeg(recptr, sendSegNo);
01245 
01246             /*-------
01247              * When reading from a historic timeline, and there is a timeline
01248              * switch within this segment, read from the WAL segment belonging
01249              * to the new timeline.
01250              *
01251              * For example, imagine that this server is currently on timeline
01252              * 5, and we're streaming timeline 4. The switch from timeline 4
01253              * to 5 happened at 0/13002088. In pg_xlog, we have these files:
01254              *
01255              * ...
01256              * 000000040000000000000012
01257              * 000000040000000000000013
01258              * 000000050000000000000013
01259              * 000000050000000000000014
01260              * ...
01261              *
01262              * In this situation, when requested to send the WAL from
01263              * segment 0x13, on timeline 4, we read the WAL from file
01264              * 000000050000000000000013. Archive recovery prefers files from
01265              * newer timelines, so if the segment was restored from the
01266              * archive on this server, the file belonging to the old timeline,
01267              * 000000040000000000000013, might not exist. Their contents are
01268              * equal up to the switchpoint, because at a timeline switch, the
01269              * used portion of the old segment is copied to the new file.
01270              *-------
01271              */
01272             curFileTimeLine = sendTimeLine;
01273             if (sendTimeLineIsHistoric)
01274             {
01275                 XLogSegNo endSegNo;
01276 
01277                 XLByteToSeg(sendTimeLineValidUpto, endSegNo);
01278                 if (sendSegNo == endSegNo)
01279                     curFileTimeLine = sendTimeLineNextTLI;
01280             }
01281 
01282             XLogFilePath(path, curFileTimeLine, sendSegNo);
01283 
01284             sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
01285             if (sendFile < 0)
01286             {
01287                 /*
01288                  * If the file is not found, assume it's because the standby
01289                  * asked for a too old WAL segment that has already been
01290                  * removed or recycled.
01291                  */
01292                 if (errno == ENOENT)
01293                     ereport(ERROR,
01294                             (errcode_for_file_access(),
01295                              errmsg("requested WAL segment %s has already been removed",
01296                                     XLogFileNameP(curFileTimeLine, sendSegNo))));
01297                 else
01298                     ereport(ERROR,
01299                             (errcode_for_file_access(),
01300                              errmsg("could not open file \"%s\": %m",
01301                                     path)));
01302             }
01303             sendOff = 0;
01304         }
01305 
01306         /* Need to seek in the file? */
01307         if (sendOff != startoff)
01308         {
01309             if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
01310                 ereport(ERROR,
01311                         (errcode_for_file_access(),
01312                          errmsg("could not seek in log segment %s to offset %u: %m",
01313                                 XLogFileNameP(curFileTimeLine, sendSegNo),
01314                                 startoff)));
01315             sendOff = startoff;
01316         }
01317 
01318         /* How many bytes are within this segment? */
01319         if (nbytes > (XLogSegSize - startoff))
01320             segbytes = XLogSegSize - startoff;
01321         else
01322             segbytes = nbytes;
01323 
01324         readbytes = read(sendFile, p, segbytes);
01325         if (readbytes <= 0)
01326         {
01327             ereport(ERROR,
01328                     (errcode_for_file_access(),
01329             errmsg("could not read from log segment %s, offset %u, length %lu: %m",
01330                    XLogFileNameP(curFileTimeLine, sendSegNo),
01331                    sendOff, (unsigned long) segbytes)));
01332         }
01333 
01334         /* Update state for read */
01335         recptr += readbytes;
01336 
01337         sendOff += readbytes;
01338         nbytes -= readbytes;
01339         p += readbytes;
01340     }
01341 
01342     /*
01343      * After reading into the buffer, check that what we read was valid. We do
01344      * this after reading, because even though the segment was present when we
01345      * opened it, it might get recycled or removed while we read it. The
01346      * read() succeeds in that case, but the data we tried to read might
01347      * already have been overwritten with new WAL records.
01348      */
01349     XLByteToSeg(startptr, segno);
01350     CheckXLogRemoved(segno, ThisTimeLineID);
01351 
01352     /*
01353      * During recovery, the currently-open WAL file might be replaced with the
01354      * file of the same name retrieved from archive. So we always need to
01355      * check what we read was valid after reading into the buffer. If it's
01356      * invalid, we try to open and read the file again.
01357      */
01358     if (am_cascading_walsender)
01359     {
01360         /* use volatile pointer to prevent code rearrangement */
01361         volatile WalSnd *walsnd = MyWalSnd;
01362         bool        reload;
01363 
01364         SpinLockAcquire(&walsnd->mutex);
01365         reload = walsnd->needreload;
01366         walsnd->needreload = false;
01367         SpinLockRelease(&walsnd->mutex);
01368 
01369         if (reload && sendFile >= 0)
01370         {
01371             close(sendFile);
01372             sendFile = -1;
01373 
01374             goto retry;
01375         }
01376     }
01377 }
01378 
01379 /*
01380  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
01381  * but not yet sent to the client, and buffer it in the libpq output
01382  * buffer.
01383  *
01384  * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
01385  * *caughtup is set to false.
01386  */
01387 static void
01388 XLogSend(bool *caughtup)
01389 {
01390     XLogRecPtr  SendRqstPtr;
01391     XLogRecPtr  startptr;
01392     XLogRecPtr  endptr;
01393     Size        nbytes;
01394 
01395     if (streamingDoneSending)
01396     {
01397         *caughtup = true;
01398         return;
01399     }
01400 
01401     /* Figure out how far we can safely send the WAL. */
01402     if (sendTimeLineIsHistoric)
01403     {
01404         /*
01405          * Streaming an old timeline timeline that's in this server's history,
01406          * but is not the one we're currently inserting or replaying. It can
01407          * be streamed up to the point where we switched off that timeline.
01408          */
01409         SendRqstPtr = sendTimeLineValidUpto;
01410     }
01411     else if (am_cascading_walsender)
01412     {
01413         /*
01414          * Streaming the latest timeline on a standby.
01415          *
01416          * Attempt to send all WAL that has already been replayed, so that
01417          * we know it's valid. If we're receiving WAL through streaming
01418          * replication, it's also OK to send any WAL that has been received
01419          * but not replayed.
01420          *
01421          * The timeline we're recovering from can change, or we can be
01422          * promoted. In either case, the current timeline becomes historic.
01423          * We need to detect that so that we don't try to stream past the
01424          * point where we switched to another timeline. We check for promotion
01425          * or timeline switch after calculating FlushPtr, to avoid a race
01426          * condition: if the timeline becomes historic just after we checked
01427          * that it was still current, it's still be OK to stream it up to the
01428          * FlushPtr that was calculated before it became historic.
01429          */
01430         bool        becameHistoric = false;
01431 
01432         SendRqstPtr = GetStandbyFlushRecPtr();
01433 
01434         if (!RecoveryInProgress())
01435         {
01436             /*
01437              * We have been promoted. RecoveryInProgress() updated
01438              * ThisTimeLineID to the new current timeline.
01439              */
01440             am_cascading_walsender = false;
01441             becameHistoric = true;
01442         }
01443         else
01444         {
01445             /*
01446              * Still a cascading standby. But is the timeline we're sending
01447              * still the one recovery is recovering from? ThisTimeLineID was
01448              * updated by the GetStandbyFlushRecPtr() call above.
01449              */
01450             if (sendTimeLine != ThisTimeLineID)
01451                 becameHistoric = true;
01452         }
01453 
01454         if (becameHistoric)
01455         {
01456             /*
01457              * The timeline we were sending has become historic. Read the
01458              * timeline history file of the new timeline to see where exactly
01459              * we forked off from the timeline we were sending.
01460              */
01461             List       *history;
01462 
01463             history = readTimeLineHistory(ThisTimeLineID);
01464             sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
01465             Assert(sentPtr <= sendTimeLineValidUpto);
01466             Assert(sendTimeLine < sendTimeLineNextTLI);
01467             list_free_deep(history);
01468 
01469             /* the current send pointer should be <= the switchpoint */
01470             if (!(sentPtr <= sendTimeLineValidUpto))
01471                 elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
01472                      sendTimeLine,
01473                      (uint32) (sendTimeLineValidUpto >> 32),
01474                      (uint32) sendTimeLineValidUpto,
01475                      (uint32) (sentPtr >> 32),
01476                      (uint32) sentPtr);
01477 
01478             sendTimeLineIsHistoric = true;
01479 
01480             SendRqstPtr = sendTimeLineValidUpto;
01481         }
01482     }
01483     else
01484     {
01485         /*
01486          * Streaming the current timeline on a master.
01487          *
01488          * Attempt to send all data that's already been written out and
01489          * fsync'd to disk.  We cannot go further than what's been written out
01490          * given the current implementation of XLogRead().  And in any case
01491          * it's unsafe to send WAL that is not securely down to disk on the
01492          * master: if the master subsequently crashes and restarts, slaves
01493          * must not have applied any WAL that gets lost on the master.
01494          */
01495         SendRqstPtr = GetFlushRecPtr();
01496     }
01497 
01498     /*
01499      * If this is a historic timeline and we've reached the point where we
01500      * forked to the next timeline, stop streaming.
01501      */
01502     if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
01503     {
01504         /* close the current file. */
01505         if (sendFile >= 0)
01506             close(sendFile);
01507         sendFile = -1;
01508 
01509         /* Send CopyDone */
01510         pq_putmessage_noblock('c', NULL, 0);
01511         streamingDoneSending = true;
01512 
01513         *caughtup = true;
01514         return;
01515     }
01516 
01517     /* Do we have any work to do? */
01518     Assert(sentPtr <= SendRqstPtr);
01519     if (SendRqstPtr <= sentPtr)
01520     {
01521         *caughtup = true;
01522         return;
01523     }
01524 
01525     /*
01526      * Figure out how much to send in one message. If there's no more than
01527      * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
01528      * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
01529      *
01530      * The rounding is not only for performance reasons. Walreceiver relies on
01531      * the fact that we never split a WAL record across two messages. Since a
01532      * long WAL record is split at page boundary into continuation records,
01533      * page boundary is always a safe cut-off point. We also assume that
01534      * SendRqstPtr never points to the middle of a WAL record.
01535      */
01536     startptr = sentPtr;
01537     endptr = startptr;
01538     endptr += MAX_SEND_SIZE;
01539 
01540     /* if we went beyond SendRqstPtr, back off */
01541     if (SendRqstPtr <= endptr)
01542     {
01543         endptr = SendRqstPtr;
01544         if (sendTimeLineIsHistoric)
01545             *caughtup = false;
01546         else
01547             *caughtup = true;
01548     }
01549     else
01550     {
01551         /* round down to page boundary. */
01552         endptr -= (endptr % XLOG_BLCKSZ);
01553         *caughtup = false;
01554     }
01555 
01556     nbytes = endptr - startptr;
01557     Assert(nbytes <= MAX_SEND_SIZE);
01558 
01559     /*
01560      * OK to read and send the slice.
01561      */
01562     resetStringInfo(&output_message);
01563     pq_sendbyte(&output_message, 'w');
01564 
01565     pq_sendint64(&output_message, startptr);    /* dataStart */
01566     pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
01567     pq_sendint64(&output_message, 0);           /* sendtime, filled in last */
01568 
01569     /*
01570      * Read the log directly into the output buffer to avoid extra memcpy
01571      * calls.
01572      */
01573     enlargeStringInfo(&output_message, nbytes);
01574     XLogRead(&output_message.data[output_message.len], startptr, nbytes);
01575     output_message.len += nbytes;
01576     output_message.data[output_message.len] = '\0';
01577 
01578     /*
01579      * Fill the send timestamp last, so that it is taken as late as possible.
01580      */
01581     resetStringInfo(&tmpbuf);
01582     pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
01583     memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
01584            tmpbuf.data, sizeof(int64));
01585 
01586     pq_putmessage_noblock('d', output_message.data, output_message.len);
01587 
01588     sentPtr = endptr;
01589 
01590     /* Update shared memory status */
01591     {
01592         /* use volatile pointer to prevent code rearrangement */
01593         volatile WalSnd *walsnd = MyWalSnd;
01594 
01595         SpinLockAcquire(&walsnd->mutex);
01596         walsnd->sentPtr = sentPtr;
01597         SpinLockRelease(&walsnd->mutex);
01598     }
01599 
01600     /* Report progress of XLOG streaming in PS display */
01601     if (update_process_title)
01602     {
01603         char        activitymsg[50];
01604 
01605         snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
01606                  (uint32) (sentPtr >> 32), (uint32) sentPtr);
01607         set_ps_display(activitymsg, false);
01608     }
01609 
01610     return;
01611 }
01612 
01613 /*
01614  * Returns the latest point in WAL that has been safely flushed to disk, and
01615  * can be sent to the standby. This should only be called when in recovery,
01616  * ie. we're streaming to a cascaded standby.
01617  *
01618  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
01619  * replayed WAL record.
01620  */
01621 static XLogRecPtr
01622 GetStandbyFlushRecPtr(void)
01623 {
01624     XLogRecPtr replayPtr;
01625     TimeLineID replayTLI;
01626     XLogRecPtr receivePtr;
01627     TimeLineID receiveTLI;
01628     XLogRecPtr  result;
01629 
01630     /*
01631      * We can safely send what's already been replayed. Also, if walreceiver
01632      * is streaming WAL from the same timeline, we can send anything that
01633      * it has streamed, but hasn't been replayed yet.
01634      */
01635 
01636     receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
01637     replayPtr = GetXLogReplayRecPtr(&replayTLI);
01638 
01639     ThisTimeLineID = replayTLI;
01640 
01641     result = replayPtr;
01642     if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
01643         result = receivePtr;
01644 
01645     return result;
01646 }
01647 
01648 /*
01649  * Request walsenders to reload the currently-open WAL file
01650  */
01651 void
01652 WalSndRqstFileReload(void)
01653 {
01654     int         i;
01655 
01656     for (i = 0; i < max_wal_senders; i++)
01657     {
01658         /* use volatile pointer to prevent code rearrangement */
01659         volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
01660 
01661         if (walsnd->pid == 0)
01662             continue;
01663 
01664         SpinLockAcquire(&walsnd->mutex);
01665         walsnd->needreload = true;
01666         SpinLockRelease(&walsnd->mutex);
01667     }
01668 }
01669 
01670 /* SIGHUP: set flag to re-read config file at next convenient time */
01671 static void
01672 WalSndSigHupHandler(SIGNAL_ARGS)
01673 {
01674     int         save_errno = errno;
01675 
01676     got_SIGHUP = true;
01677     if (MyWalSnd)
01678         SetLatch(&MyWalSnd->latch);
01679 
01680     errno = save_errno;
01681 }
01682 
01683 /* SIGUSR1: set flag to send WAL records */
01684 static void
01685 WalSndXLogSendHandler(SIGNAL_ARGS)
01686 {
01687     int         save_errno = errno;
01688 
01689     latch_sigusr1_handler();
01690 
01691     errno = save_errno;
01692 }
01693 
01694 /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
01695 static void
01696 WalSndLastCycleHandler(SIGNAL_ARGS)
01697 {
01698     int         save_errno = errno;
01699 
01700     /*
01701      * If replication has not yet started, die like with SIGTERM. If
01702      * replication is active, only set a flag and wake up the main loop. It
01703      * will send any outstanding WAL, and then exit gracefully.
01704      */
01705     if (!replication_active)
01706         kill(MyProcPid, SIGTERM);
01707 
01708     walsender_ready_to_stop = true;
01709     if (MyWalSnd)
01710         SetLatch(&MyWalSnd->latch);
01711 
01712     errno = save_errno;
01713 }
01714 
01715 /* Set up signal handlers */
01716 void
01717 WalSndSignals(void)
01718 {
01719     /* Set up signal handlers */
01720     pqsignal(SIGHUP, WalSndSigHupHandler);      /* set flag to read config
01721                                                  * file */
01722     pqsignal(SIGINT, SIG_IGN);  /* not used */
01723     pqsignal(SIGTERM, die);                     /* request shutdown */
01724     pqsignal(SIGQUIT, quickdie);                /* hard crash time */
01725     InitializeTimeouts();       /* establishes SIGALRM handler */
01726     pqsignal(SIGPIPE, SIG_IGN);
01727     pqsignal(SIGUSR1, WalSndXLogSendHandler);   /* request WAL sending */
01728     pqsignal(SIGUSR2, WalSndLastCycleHandler);  /* request a last cycle and
01729                                                  * shutdown */
01730 
01731     /* Reset some signals that are accepted by postmaster but not here */
01732     pqsignal(SIGCHLD, SIG_DFL);
01733     pqsignal(SIGTTIN, SIG_DFL);
01734     pqsignal(SIGTTOU, SIG_DFL);
01735     pqsignal(SIGCONT, SIG_DFL);
01736     pqsignal(SIGWINCH, SIG_DFL);
01737 }
01738 
01739 /* Report shared-memory space needed by WalSndShmemInit */
01740 Size
01741 WalSndShmemSize(void)
01742 {
01743     Size        size = 0;
01744 
01745     size = offsetof(WalSndCtlData, walsnds);
01746     size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
01747 
01748     return size;
01749 }
01750 
01751 /* Allocate and initialize walsender-related shared memory */
01752 void
01753 WalSndShmemInit(void)
01754 {
01755     bool        found;
01756     int         i;
01757 
01758     WalSndCtl = (WalSndCtlData *)
01759         ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
01760 
01761     if (!found)
01762     {
01763         /* First time through, so initialize */
01764         MemSet(WalSndCtl, 0, WalSndShmemSize());
01765 
01766         for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
01767             SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
01768 
01769         for (i = 0; i < max_wal_senders; i++)
01770         {
01771             WalSnd     *walsnd = &WalSndCtl->walsnds[i];
01772 
01773             SpinLockInit(&walsnd->mutex);
01774             InitSharedLatch(&walsnd->latch);
01775         }
01776     }
01777 }
01778 
01779 /*
01780  * Wake up all walsenders
01781  *
01782  * This will be called inside critical sections, so throwing an error is not
01783  * adviseable.
01784  */
01785 void
01786 WalSndWakeup(void)
01787 {
01788     int         i;
01789 
01790     for (i = 0; i < max_wal_senders; i++)
01791         SetLatch(&WalSndCtl->walsnds[i].latch);
01792 }
01793 
01794 /* Set state for current walsender (only called in walsender) */
01795 void
01796 WalSndSetState(WalSndState state)
01797 {
01798     /* use volatile pointer to prevent code rearrangement */
01799     volatile WalSnd *walsnd = MyWalSnd;
01800 
01801     Assert(am_walsender);
01802 
01803     if (walsnd->state == state)
01804         return;
01805 
01806     SpinLockAcquire(&walsnd->mutex);
01807     walsnd->state = state;
01808     SpinLockRelease(&walsnd->mutex);
01809 }
01810 
01811 /*
01812  * Return a string constant representing the state. This is used
01813  * in system views, and should *not* be translated.
01814  */
01815 static const char *
01816 WalSndGetStateString(WalSndState state)
01817 {
01818     switch (state)
01819     {
01820         case WALSNDSTATE_STARTUP:
01821             return "startup";
01822         case WALSNDSTATE_BACKUP:
01823             return "backup";
01824         case WALSNDSTATE_CATCHUP:
01825             return "catchup";
01826         case WALSNDSTATE_STREAMING:
01827             return "streaming";
01828     }
01829     return "UNKNOWN";
01830 }
01831 
01832 
01833 /*
01834  * Returns activity of walsenders, including pids and xlog locations sent to
01835  * standby servers.
01836  */
01837 Datum
01838 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
01839 {
01840 #define PG_STAT_GET_WAL_SENDERS_COLS    8
01841     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
01842     TupleDesc   tupdesc;
01843     Tuplestorestate *tupstore;
01844     MemoryContext per_query_ctx;
01845     MemoryContext oldcontext;
01846     int        *sync_priority;
01847     int         priority = 0;
01848     int         sync_standby = -1;
01849     int         i;
01850 
01851     /* check to see if caller supports us returning a tuplestore */
01852     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
01853         ereport(ERROR,
01854                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01855                  errmsg("set-valued function called in context that cannot accept a set")));
01856     if (!(rsinfo->allowedModes & SFRM_Materialize))
01857         ereport(ERROR,
01858                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
01859                  errmsg("materialize mode required, but it is not " \
01860                         "allowed in this context")));
01861 
01862     /* Build a tuple descriptor for our result type */
01863     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
01864         elog(ERROR, "return type must be a row type");
01865 
01866     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
01867     oldcontext = MemoryContextSwitchTo(per_query_ctx);
01868 
01869     tupstore = tuplestore_begin_heap(true, false, work_mem);
01870     rsinfo->returnMode = SFRM_Materialize;
01871     rsinfo->setResult = tupstore;
01872     rsinfo->setDesc = tupdesc;
01873 
01874     MemoryContextSwitchTo(oldcontext);
01875 
01876     /*
01877      * Get the priorities of sync standbys all in one go, to minimise lock
01878      * acquisitions and to allow us to evaluate who is the current sync
01879      * standby. This code must match the code in SyncRepReleaseWaiters().
01880      */
01881     sync_priority = palloc(sizeof(int) * max_wal_senders);
01882     LWLockAcquire(SyncRepLock, LW_SHARED);
01883     for (i = 0; i < max_wal_senders; i++)
01884     {
01885         /* use volatile pointer to prevent code rearrangement */
01886         volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
01887 
01888         if (walsnd->pid != 0)
01889         {
01890             /*
01891              * Treat a standby such as a pg_basebackup background process
01892              * which always returns an invalid flush location, as an
01893              * asynchronous standby.
01894              */
01895             sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
01896                 0 : walsnd->sync_standby_priority;
01897 
01898             if (walsnd->state == WALSNDSTATE_STREAMING &&
01899                 walsnd->sync_standby_priority > 0 &&
01900                 (priority == 0 ||
01901                  priority > walsnd->sync_standby_priority) &&
01902                 !XLogRecPtrIsInvalid(walsnd->flush))
01903             {
01904                 priority = walsnd->sync_standby_priority;
01905                 sync_standby = i;
01906             }
01907         }
01908     }
01909     LWLockRelease(SyncRepLock);
01910 
01911     for (i = 0; i < max_wal_senders; i++)
01912     {
01913         /* use volatile pointer to prevent code rearrangement */
01914         volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
01915         char        location[MAXFNAMELEN];
01916         XLogRecPtr  sentPtr;
01917         XLogRecPtr  write;
01918         XLogRecPtr  flush;
01919         XLogRecPtr  apply;
01920         WalSndState state;
01921         Datum       values[PG_STAT_GET_WAL_SENDERS_COLS];
01922         bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];
01923 
01924         if (walsnd->pid == 0)
01925             continue;
01926 
01927         SpinLockAcquire(&walsnd->mutex);
01928         sentPtr = walsnd->sentPtr;
01929         state = walsnd->state;
01930         write = walsnd->write;
01931         flush = walsnd->flush;
01932         apply = walsnd->apply;
01933         SpinLockRelease(&walsnd->mutex);
01934 
01935         memset(nulls, 0, sizeof(nulls));
01936         values[0] = Int32GetDatum(walsnd->pid);
01937 
01938         if (!superuser())
01939         {
01940             /*
01941              * Only superusers can see details. Other users only get the pid
01942              * value to know it's a walsender, but no details.
01943              */
01944             MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
01945         }
01946         else
01947         {
01948             values[1] = CStringGetTextDatum(WalSndGetStateString(state));
01949 
01950             snprintf(location, sizeof(location), "%X/%X",
01951                      (uint32) (sentPtr >> 32), (uint32) sentPtr);
01952             values[2] = CStringGetTextDatum(location);
01953 
01954             if (write == 0)
01955                 nulls[3] = true;
01956             snprintf(location, sizeof(location), "%X/%X",
01957                      (uint32) (write >> 32), (uint32) write);
01958             values[3] = CStringGetTextDatum(location);
01959 
01960             if (flush == 0)
01961                 nulls[4] = true;
01962             snprintf(location, sizeof(location), "%X/%X",
01963                      (uint32) (flush >> 32), (uint32) flush);
01964             values[4] = CStringGetTextDatum(location);
01965 
01966             if (apply == 0)
01967                 nulls[5] = true;
01968             snprintf(location, sizeof(location), "%X/%X",
01969                      (uint32) (apply >> 32), (uint32) apply);
01970             values[5] = CStringGetTextDatum(location);
01971 
01972             values[6] = Int32GetDatum(sync_priority[i]);
01973 
01974             /*
01975              * More easily understood version of standby state. This is purely
01976              * informational, not different from priority.
01977              */
01978             if (sync_priority[i] == 0)
01979                 values[7] = CStringGetTextDatum("async");
01980             else if (i == sync_standby)
01981                 values[7] = CStringGetTextDatum("sync");
01982             else
01983                 values[7] = CStringGetTextDatum("potential");
01984         }
01985 
01986         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
01987     }
01988     pfree(sync_priority);
01989 
01990     /* clean up and return the tuplestore */
01991     tuplestore_donestoring(tupstore);
01992 
01993     return (Datum) 0;
01994 }
01995 
01996 /*
01997   * This function is used to send keepalive message to standby.
01998   * If requestReply is set, sets a flag in the message requesting the standby
01999   * to send a message back to us, for heartbeat purposes.
02000   */
02001 static void
02002 WalSndKeepalive(bool requestReply)
02003 {
02004     elog(DEBUG2, "sending replication keepalive");
02005 
02006     /* construct the message... */
02007     resetStringInfo(&output_message);
02008     pq_sendbyte(&output_message, 'k');
02009     pq_sendint64(&output_message, sentPtr);
02010     pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
02011     pq_sendbyte(&output_message, requestReply ? 1 : 0);
02012 
02013     /* ... and send it wrapped in CopyData */
02014     pq_putmessage_noblock('d', output_message.data, output_message.len);
02015 }
02016 
02017 /*
02018  * This isn't currently used for anything. Monitoring tools might be
02019  * interested in the future, and we'll need something like this in the
02020  * future for synchronous replication.
02021  */
02022 #ifdef NOT_USED
02023 /*
02024  * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr
02025  * if none.
02026  */
02027 XLogRecPtr
02028 GetOldestWALSendPointer(void)
02029 {
02030     XLogRecPtr  oldest = {0, 0};
02031     int         i;
02032     bool        found = false;
02033 
02034     for (i = 0; i < max_wal_senders; i++)
02035     {
02036         /* use volatile pointer to prevent code rearrangement */
02037         volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
02038         XLogRecPtr  recptr;
02039 
02040         if (walsnd->pid == 0)
02041             continue;
02042 
02043         SpinLockAcquire(&walsnd->mutex);
02044         recptr = walsnd->sentPtr;
02045         SpinLockRelease(&walsnd->mutex);
02046 
02047         if (recptr.xlogid == 0 && recptr.xrecoff == 0)
02048             continue;
02049 
02050         if (!found || recptr < oldest)
02051             oldest = recptr;
02052         found = true;
02053     }
02054     return oldest;
02055 }
02056 
02057 #endif