Header And Logo

PostgreSQL
| The world's most advanced open source database.

Defines | Functions | Variables

walreceiver.c File Reference

#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
#include "utils/guc.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiver.c:

Go to the source code of this file.

Defines

#define NAPTIME_PER_CYCLE   100

Functions

static void ProcessWalRcvInterrupts (void)
static void EnableWalRcvImmediateExit (void)
static void DisableWalRcvImmediateExit (void)
static void WalRcvFetchTimeLineHistoryFiles (TimeLineID first, TimeLineID last)
static void WalRcvWaitForStartPosition (XLogRecPtr *startpoint, TimeLineID *startpointTLI)
static void WalRcvDie (int code, Datum arg)
static void XLogWalRcvProcessMsg (unsigned char type, char *buf, Size len)
static void XLogWalRcvWrite (char *buf, Size nbytes, XLogRecPtr recptr)
static void XLogWalRcvFlush (bool dying)
static void XLogWalRcvSendReply (bool force, bool requestReply)
static void XLogWalRcvSendHSFeedback (bool immed)
static void ProcessWalSndrMessage (XLogRecPtr walEnd, TimestampTz sendTime)
static void WalRcvSigHupHandler (SIGNAL_ARGS)
static void WalRcvSigUsr1Handler (SIGNAL_ARGS)
static void WalRcvShutdownHandler (SIGNAL_ARGS)
static void WalRcvQuickDieHandler (SIGNAL_ARGS)
void WalReceiverMain (void)

Variables

int wal_receiver_status_interval
int wal_receiver_timeout
bool hot_standby_feedback
walrcv_connect_type walrcv_connect = NULL
walrcv_identify_system_type walrcv_identify_system = NULL
walrcv_startstreaming_type walrcv_startstreaming = NULL
walrcv_endstreaming_type walrcv_endstreaming = NULL
walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL
walrcv_receive_type walrcv_receive = NULL
walrcv_send_type walrcv_send = NULL
walrcv_disconnect_type walrcv_disconnect = NULL
static int recvFile = -1
static TimeLineID recvFileTLI = 0
static XLogSegNo recvSegNo = 0
static uint32 recvOff = 0
static volatile sig_atomic_t got_SIGHUP = false
static volatile sig_atomic_t got_SIGTERM = false
struct {
   XLogRecPtr   Write
   XLogRecPtr   Flush
LogstreamResult
static StringInfoData reply_message
static StringInfoData incoming_message
static volatile bool WalRcvImmediateInterruptOK = false

Define Documentation

#define NAPTIME_PER_CYCLE   100

Definition at line 81 of file walreceiver.c.

Referenced by WalReceiverMain().


Function Documentation

static void DisableWalRcvImmediateExit ( void   )  [static]
static void EnableWalRcvImmediateExit ( void   )  [static]
static void ProcessWalRcvInterrupts ( void   )  [static]

Definition at line 153 of file walreceiver.c.

References CHECK_FOR_INTERRUPTS, ereport, errcode(), errmsg(), FATAL, got_SIGTERM, and WalRcvImmediateInterruptOK.

Referenced by DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), WalRcvShutdownHandler(), WalRcvWaitForStartPosition(), and WalReceiverMain().

{
    /*
     * Although walreceiver interrupt handling doesn't use the same scheme as
     * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
     * any incoming signals on Win32.
     */
    CHECK_FOR_INTERRUPTS();

    if (got_SIGTERM)
    {
        WalRcvImmediateInterruptOK = false;
        ereport(FATAL,
                (errcode(ERRCODE_ADMIN_SHUTDOWN),
                 errmsg("terminating walreceiver process due to administrator command")));
    }
}

static void ProcessWalSndrMessage ( XLogRecPtr  walEnd,
TimestampTz  sendTime 
) [static]

Definition at line 1168 of file walreceiver.c.

References DEBUG2, elog, GetCurrentTimestamp(), GetReplicationApplyDelay(), GetReplicationTransferLatency(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latestWalEnd, WalRcvData::latestWalEndTime, log_min_messages, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), and WalRcv.

Referenced by XLogWalRcvProcessMsg().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;

    TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();

    /* Update shared-memory status */
    SpinLockAcquire(&walrcv->mutex);
    if (walrcv->latestWalEnd < walEnd)
        walrcv->latestWalEndTime = sendTime;
    walrcv->latestWalEnd = walEnd;
    walrcv->lastMsgSendTime = sendTime;
    walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
    SpinLockRelease(&walrcv->mutex);

    if (log_min_messages <= DEBUG2)
        elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
             timestamptz_to_str(sendTime),
             timestamptz_to_str(lastMsgReceiptTime),
             GetReplicationApplyDelay(),
             GetReplicationTransferLatency());
}

static void WalRcvDie ( int  code,
Datum  arg 
) [static]

Definition at line 692 of file walreceiver.c.

References Assert, DisownLatch(), WalRcvData::latch, WalRcvData::mutex, MyProcPid, NULL, WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WakeupRecovery(), WalRcv, walrcv_disconnect, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, and XLogWalRcvFlush().

Referenced by WalReceiverMain().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;

    /* Ensure that all WAL records received are flushed to disk */
    XLogWalRcvFlush(true);

    DisownLatch(&walrcv->latch);

    SpinLockAcquire(&walrcv->mutex);
    Assert(walrcv->walRcvState == WALRCV_STREAMING ||
           walrcv->walRcvState == WALRCV_RESTARTING ||
           walrcv->walRcvState == WALRCV_STARTING ||
           walrcv->walRcvState == WALRCV_WAITING ||
           walrcv->walRcvState == WALRCV_STOPPING);
    Assert(walrcv->pid == MyProcPid);
    walrcv->walRcvState = WALRCV_STOPPED;
    walrcv->pid = 0;
    SpinLockRelease(&walrcv->mutex);

    /* Terminate the connection gracefully. */
    if (walrcv_disconnect != NULL)
        walrcv_disconnect();

    /* Wake up the startup process to notice promptly that we're gone */
    WakeupRecovery();
}

static void WalRcvFetchTimeLineHistoryFiles ( TimeLineID  first,
TimeLineID  last 
) [static]

Definition at line 644 of file walreceiver.c.

References DisableWalRcvImmediateExit(), EnableWalRcvImmediateExit(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, existsTimeLineHistory(), LOG, pfree(), TLHistoryFileName, walrcv_readtimelinehistoryfile, and writeTimeLineHistoryFile().

Referenced by WalReceiverMain().

{
    TimeLineID tli;

    for (tli = first; tli <= last; tli++)
    {
        /* there's no history file for timeline 1 */
        if (tli != 1 && !existsTimeLineHistory(tli))
        {
            char       *fname;
            char       *content;
            int         len;
            char        expectedfname[MAXFNAMELEN];

            ereport(LOG,
                    (errmsg("fetching timeline history file for timeline %u from primary server",
                            tli)));

            EnableWalRcvImmediateExit();
            walrcv_readtimelinehistoryfile(tli, &fname, &content, &len);
            DisableWalRcvImmediateExit();

            /*
             * Check that the filename on the master matches what we calculated
             * ourselves. This is just a sanity check, it should always match.
             */
            TLHistoryFileName(expectedfname, tli);
            if (strcmp(fname, expectedfname) != 0)
                ereport(ERROR,
                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
                         errmsg_internal("primary reported unexpected filename for timeline history file of timeline %u",
                                         tli)));

            /*
             * Write the file to pg_xlog.
             */
            writeTimeLineHistoryFile(tli, content, len);

            pfree(fname);
            pfree(content);
        }
    }
}

static void WalRcvQuickDieHandler ( SIGNAL_ARGS   )  [static]

Definition at line 760 of file walreceiver.c.

References BlockSig, on_exit_reset(), and PG_SETMASK.

Referenced by WalReceiverMain().

{
    PG_SETMASK(&BlockSig);

    /*
     * We DO NOT want to run proc_exit() callbacks -- we're here because
     * shared memory may be corrupted, so we don't want to try to clean up our
     * transaction.  Just nail the windows shut and get out of town.  Now that
     * there's an atexit callback to prevent third-party code from breaking
     * things by calling exit() directly, we have to reset the callbacks
     * explicitly to make this work as intended.
     */
    on_exit_reset();

    /*
     * Note we do exit(2) not exit(0).  This is to force the postmaster into a
     * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
     * backend.  This is necessary precisely because we don't clean up our
     * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
     * should ensure the postmaster sees this as a crash, too, but no harm in
     * being doubly sure.)
     */
    exit(2);
}

static void WalRcvShutdownHandler ( SIGNAL_ARGS   )  [static]

Definition at line 738 of file walreceiver.c.

References got_SIGTERM, WalRcvData::latch, proc_exit_inprogress, ProcessWalRcvInterrupts(), SetLatch(), WalRcv, and WalRcvImmediateInterruptOK.

Referenced by WalReceiverMain().

{
    int         save_errno = errno;

    got_SIGTERM = true;

    SetLatch(&WalRcv->latch);

    /* Don't joggle the elbow of proc_exit */
    if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
        ProcessWalRcvInterrupts();

    errno = save_errno;
}

static void WalRcvSigHupHandler ( SIGNAL_ARGS   )  [static]

Definition at line 723 of file walreceiver.c.

References got_SIGHUP.

Referenced by WalReceiverMain().

{
    got_SIGHUP = true;
}

static void WalRcvSigUsr1Handler ( SIGNAL_ARGS   )  [static]

Definition at line 731 of file walreceiver.c.

References latch_sigusr1_handler().

Referenced by WalReceiverMain().

static void WalRcvWaitForStartPosition ( XLogRecPtr startpoint,
TimeLineID startpointTLI 
) [static]

Definition at line 559 of file walreceiver.c.

References Assert, elog, FATAL, WalRcvData::latch, WalRcvData::mutex, PostmasterIsAlive(), proc_exit(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, ResetLatch(), set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, update_process_title, WaitLatch(), WakeupRecovery(), WalRcv, WALRCV_RESTARTING, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvData::walRcvState, WL_LATCH_SET, and WL_POSTMASTER_DEATH.

Referenced by WalReceiverMain().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    int         state;

    SpinLockAcquire(&walrcv->mutex);
    state = walrcv->walRcvState;
    if (state != WALRCV_STREAMING)
    {
        SpinLockRelease(&walrcv->mutex);
        if (state == WALRCV_STOPPING)
            proc_exit(0);
        else
            elog(FATAL, "unexpected walreceiver state");
    }
    walrcv->walRcvState = WALRCV_WAITING;
    walrcv->receiveStart = InvalidXLogRecPtr;
    walrcv->receiveStartTLI = 0;
    SpinLockRelease(&walrcv->mutex);

    if (update_process_title)
        set_ps_display("idle", false);

    /*
     * nudge startup process to notice that we've stopped streaming and are
     * now waiting for instructions.
     */
    WakeupRecovery();
    for (;;)
    {
        ResetLatch(&walrcv->latch);

        /*
         * Emergency bailout if postmaster has died.  This is to avoid the
         * necessity for manual cleanup of all postmaster children.
         */
        if (!PostmasterIsAlive())
            exit(1);

        ProcessWalRcvInterrupts();

        SpinLockAcquire(&walrcv->mutex);
        Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
               walrcv->walRcvState == WALRCV_WAITING ||
               walrcv->walRcvState == WALRCV_STOPPING);
        if (walrcv->walRcvState == WALRCV_RESTARTING)
        {
            /* we don't expect primary_conninfo to change */
            *startpoint = walrcv->receiveStart;
            *startpointTLI = walrcv->receiveStartTLI;
            walrcv->walRcvState = WALRCV_STREAMING;
            SpinLockRelease(&walrcv->mutex);
            break;
        }
        if (walrcv->walRcvState == WALRCV_STOPPING)
        {
            /*
             * We should've received SIGTERM if the startup process wants
             * us to die, but might as well check it here too.
             */
            SpinLockRelease(&walrcv->mutex);
            exit(1);
        }
        SpinLockRelease(&walrcv->mutex);

        WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
    }

    if (update_process_title)
    {
        char        activitymsg[50];

        snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
                 (uint32) (*startpoint >> 32),
                 (uint32) *startpoint);
        set_ps_display(activitymsg, false);
    }
}

void WalReceiverMain ( void   ) 

Definition at line 187 of file walreceiver.c.

References Assert, BlockSig, buf, close, WalRcvData::conninfo, CurrentResourceOwner, DEBUG1, DisableWalRcvImmediateExit(), elog, EnableWalRcvImmediateExit(), ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, GetCurrentTimestamp(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, LogstreamResult, MAXCONNINFO, WalRcvData::mutex, MyProcPid, NAPTIME_PER_CYCLE, now(), NULL, on_shmem_exit(), OwnLatch(), PANIC, PG_SETMASK, PGC_SIGHUP, WalRcvData::pid, ping_sent, PostmasterIsAlive(), pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResourceOwnerCreate(), SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGCONT, sigdelset, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, SpinLockAcquire, SpinLockRelease, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UnBlockSig, wal_receiver_timeout, WalRcv, walrcv_connect, walrcv_disconnect, walrcv_endstreaming, walrcv_identify_system, walrcv_readtimelinehistoryfile, walrcv_receive, WALRCV_RESTARTING, walrcv_send, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvQuickDieHandler(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvSigUsr1Handler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), XLogArchiveForceDone(), XLogFileName, XLogFileNameP(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain().

{
    char        conninfo[MAXCONNINFO];
    XLogRecPtr  startpoint;
    TimeLineID  startpointTLI;
    TimeLineID  primaryTLI;
    bool        first_stream;

    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    TimestampTz last_recv_timestamp;
    bool        ping_sent;

    /*
     * WalRcv should be set up already (if we are a backend, we inherit this
     * by fork() or EXEC_BACKEND mechanism from the postmaster).
     */
    Assert(walrcv != NULL);

    /*
     * Mark walreceiver as running in shared memory.
     *
     * Do this as early as possible, so that if we fail later on, we'll set
     * state to STOPPED. If we die before this, the startup process will keep
     * waiting for us to start up, until it times out.
     */
    SpinLockAcquire(&walrcv->mutex);
    Assert(walrcv->pid == 0);
    switch (walrcv->walRcvState)
    {
        case WALRCV_STOPPING:
            /* If we've already been requested to stop, don't start up. */
            walrcv->walRcvState = WALRCV_STOPPED;
            /* fall through */

        case WALRCV_STOPPED:
            SpinLockRelease(&walrcv->mutex);
            proc_exit(1);
            break;

        case WALRCV_STARTING:
            /* The usual case */
            break;

        case WALRCV_WAITING:
        case WALRCV_STREAMING:
        case WALRCV_RESTARTING:
        default:
            /* Shouldn't happen */
            elog(PANIC, "walreceiver still running according to shared memory state");
    }
    /* Advertise our PID so that the startup process can kill us */
    walrcv->pid = MyProcPid;
    walrcv->walRcvState = WALRCV_STREAMING;

    /* Fetch information required to start streaming */
    strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
    startpoint = walrcv->receiveStart;
    startpointTLI = walrcv->receiveStartTLI;

    /* Initialise to a sanish value */
    walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp();

    SpinLockRelease(&walrcv->mutex);

    /* Arrange to clean up at walreceiver exit */
    on_shmem_exit(WalRcvDie, 0);

    OwnLatch(&walrcv->latch);

    /*
     * If possible, make this process a group leader, so that the postmaster
     * can signal any child processes too.  (walreceiver probably never has
     * any child processes, but for consistency we make all postmaster child
     * processes do this.)
     */
#ifdef HAVE_SETSID
    if (setsid() < 0)
        elog(FATAL, "setsid() failed: %m");
#endif

    /* Properly accept or ignore signals the postmaster might send us */
    pqsignal(SIGHUP, WalRcvSigHupHandler);      /* set flag to read config
                                                 * file */
    pqsignal(SIGINT, SIG_IGN);
    pqsignal(SIGTERM, WalRcvShutdownHandler);   /* request shutdown */
    pqsignal(SIGQUIT, WalRcvQuickDieHandler);   /* hard crash time */
    pqsignal(SIGALRM, SIG_IGN);
    pqsignal(SIGPIPE, SIG_IGN);
    pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
    pqsignal(SIGUSR2, SIG_IGN);

    /* Reset some signals that are accepted by postmaster but not here */
    pqsignal(SIGCHLD, SIG_DFL);
    pqsignal(SIGTTIN, SIG_DFL);
    pqsignal(SIGTTOU, SIG_DFL);
    pqsignal(SIGCONT, SIG_DFL);
    pqsignal(SIGWINCH, SIG_DFL);

    /* We allow SIGQUIT (quickdie) at all times */
    sigdelset(&BlockSig, SIGQUIT);

    /* Load the libpq-specific functions */
    load_file("libpqwalreceiver", false);
    if (walrcv_connect == NULL || walrcv_startstreaming == NULL ||
        walrcv_endstreaming == NULL ||
        walrcv_identify_system == NULL ||
        walrcv_readtimelinehistoryfile == NULL ||
        walrcv_receive == NULL || walrcv_send == NULL ||
        walrcv_disconnect == NULL)
        elog(ERROR, "libpqwalreceiver didn't initialize correctly");

    /*
     * Create a resource owner to keep track of our resources (not clear that
     * we need this, but may as well have one).
     */
    CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");

    /* Unblock signals (they were blocked when the postmaster forked us) */
    PG_SETMASK(&UnBlockSig);

    /* Establish the connection to the primary for XLOG streaming */
    EnableWalRcvImmediateExit();
    walrcv_connect(conninfo);
    DisableWalRcvImmediateExit();

    first_stream = true;
    for (;;)
    {
        /*
         * Check that we're connected to a valid server using the
         * IDENTIFY_SYSTEM replication command,
         */
        EnableWalRcvImmediateExit();
        walrcv_identify_system(&primaryTLI);
        DisableWalRcvImmediateExit();

        /*
         * Confirm that the current timeline of the primary is the same or
         * ahead of ours.
         */
        if (primaryTLI < startpointTLI)
            ereport(ERROR,
                    (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
                            primaryTLI, startpointTLI)));

        /*
         * Get any missing history files. We do this always, even when we're
         * not interested in that timeline, so that if we're promoted to become
         * the master later on, we don't select the same timeline that was
         * already used in the current master. This isn't bullet-proof - you'll
         * need some external software to manage your cluster if you need to
         * ensure that a unique timeline id is chosen in every case, but let's
         * avoid the confusion of timeline id collisions where we can.
         */
        WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);

        /*
         * Start streaming.
         *
         * We'll try to start at the requested starting point and timeline,
         * even if it's different from the server's latest timeline. In case
         * we've already reached the end of the old timeline, the server will
         * finish the streaming immediately, and we will go back to await
         * orders from the startup process. If recovery_target_timeline is
         * 'latest', the startup process will scan pg_xlog and find the new
         * history file, bump recovery target timeline, and ask us to restart
         * on the new timeline.
         */
        ThisTimeLineID = startpointTLI;
        if (walrcv_startstreaming(startpointTLI, startpoint))
        {
            bool endofwal = false;

            if (first_stream)
                ereport(LOG,
                        (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
                                (uint32) (startpoint >> 32), (uint32) startpoint,
                                startpointTLI)));
            else
                ereport(LOG,
                        (errmsg("restarted WAL streaming at %X/%X on timeline %u",
                                (uint32) (startpoint >> 32), (uint32) startpoint,
                                startpointTLI)));
            first_stream = false;

            /* Initialize LogstreamResult and buffers for processing messages */
            LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
            initStringInfo(&reply_message);
            initStringInfo(&incoming_message);

            /* Initialize the last recv timestamp */
            last_recv_timestamp = GetCurrentTimestamp();
            ping_sent = false;

            /* Loop until end-of-streaming or error */
            while (!endofwal)
            {
                char       *buf;
                int         len;

                /*
                 * Emergency bailout if postmaster has died.  This is to avoid
                 * the necessity for manual cleanup of all postmaster children.
                 */
                if (!PostmasterIsAlive())
                    exit(1);

                /*
                 * Exit walreceiver if we're not in recovery. This should not
                 * happen, but cross-check the status here.
                 */
                if (!RecoveryInProgress())
                    ereport(FATAL,
                            (errmsg("cannot continue WAL streaming, recovery has already ended")));

                /* Process any requests or signals received recently */
                ProcessWalRcvInterrupts();

                if (got_SIGHUP)
                {
                    got_SIGHUP = false;
                    ProcessConfigFile(PGC_SIGHUP);
                    XLogWalRcvSendHSFeedback(true);
                }

                /* Wait a while for data to arrive */
                len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
                if (len != 0)
                {
                    /*
                     * Process the received data, and any subsequent data we
                     * can read without blocking.
                     */
                    for (;;)
                    {
                        if (len > 0)
                        {
                            /* Something was received from master, so reset timeout */
                            last_recv_timestamp = GetCurrentTimestamp();
                            ping_sent = false;
                            XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
                        }
                        else if (len == 0)
                            break;
                        else if (len < 0)
                        {
                            ereport(LOG,
                                    (errmsg("replication terminated by primary server"),
                                     errdetail("End of WAL reached on timeline %u at %X/%X",
                                               startpointTLI,
                                               (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
                            endofwal = true;
                            break;
                        }
                        len = walrcv_receive(0, &buf);
                    }

                    /* Let the master know that we received some data. */
                    XLogWalRcvSendReply(false, false);

                    /*
                     * If we've written some records, flush them to disk and
                     * let the startup process and primary server know about
                     * them.
                     */
                    XLogWalRcvFlush(false);
                }
                else
                {
                    /*
                     * We didn't receive anything new. If we haven't heard
                     * anything from the server for more than
                     * wal_receiver_timeout / 2, ping the server. Also, if it's
                     * been longer than wal_receiver_status_interval since the
                     * last update we sent, send a status update to the master
                     * anyway, to report any progress in applying WAL.
                     */
                    bool requestReply = false;

                    /*
                     * Check if time since last receive from standby has
                     * reached the configured limit.
                     */
                    if (wal_receiver_timeout > 0)
                    {
                        TimestampTz now = GetCurrentTimestamp();
                        TimestampTz timeout;

                        timeout =
                            TimestampTzPlusMilliseconds(last_recv_timestamp,
                                                        wal_receiver_timeout);

                        if (now >= timeout)
                            ereport(ERROR,
                                    (errmsg("terminating walreceiver due to timeout")));

                        /*
                         * We didn't receive anything new, for half of receiver
                         * replication timeout. Ping the server.
                         */
                        if (!ping_sent)
                        {
                            timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
                                                                  (wal_receiver_timeout/2));
                            if (now >= timeout)
                            {
                                requestReply = true;
                                ping_sent = true;
                            }
                        }
                    }

                    XLogWalRcvSendReply(requestReply, requestReply);
                    XLogWalRcvSendHSFeedback(false);
                }
            }

            /*
             * The backend finished streaming. Exit streaming COPY-mode from
             * our side, too.
             */
            EnableWalRcvImmediateExit();
            walrcv_endstreaming(&primaryTLI);
            DisableWalRcvImmediateExit();

            /*
             * If the server had switched to a new timeline that we didn't know
             * about when we began streaming, fetch its timeline history file
             * now.
             */
            WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
        }
        else
            ereport(LOG,
                    (errmsg("primary server contains no more WAL on requested timeline %u",
                            startpointTLI)));

        /*
         * End of WAL reached on the requested timeline. Close the last
         * segment, and await for new orders from the startup process.
         */
        if (recvFile >= 0)
        {
            char        xlogfname[MAXFNAMELEN];

            XLogWalRcvFlush(false);
            if (close(recvFile) != 0)
                ereport(PANIC,
                        (errcode_for_file_access(),
                         errmsg("could not close log segment %s: %m",
                                XLogFileNameP(recvFileTLI, recvSegNo))));

            /*
             * Create .done file forcibly to prevent the streamed segment from
             * being archived later.
             */
            XLogFileName(xlogfname, recvFileTLI, recvSegNo);
            XLogArchiveForceDone(xlogfname);
        }
        recvFile = -1;

        elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
        WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
    }
    /* not reached */
}

static void XLogWalRcvFlush ( bool  dying  )  [static]

Definition at line 964 of file walreceiver.c.

References AllowCascadeReplication, issue_xlog_fsync(), WalRcvData::latestChunkStart, LogstreamResult, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, recvFile, recvSegNo, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, ThisTimeLineID, update_process_title, WakeupRecovery(), WalRcv, WalSndWakeup(), and XLogWalRcvSendReply().

Referenced by WalRcvDie(), WalReceiverMain(), and XLogWalRcvWrite().

{
    if (LogstreamResult.Flush < LogstreamResult.Write)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;

        issue_xlog_fsync(recvFile, recvSegNo);

        LogstreamResult.Flush = LogstreamResult.Write;

        /* Update shared-memory status */
        SpinLockAcquire(&walrcv->mutex);
        if (walrcv->receivedUpto < LogstreamResult.Flush)
        {
            walrcv->latestChunkStart = walrcv->receivedUpto;
            walrcv->receivedUpto = LogstreamResult.Flush;
            walrcv->receivedTLI = ThisTimeLineID;
        }
        SpinLockRelease(&walrcv->mutex);

        /* Signal the startup process and walsender that new WAL has arrived */
        WakeupRecovery();
        if (AllowCascadeReplication())
            WalSndWakeup();

        /* Report XLOG streaming progress in PS display */
        if (update_process_title)
        {
            char        activitymsg[50];

            snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
                     (uint32) (LogstreamResult.Write >> 32),
                     (uint32) LogstreamResult.Write);
            set_ps_display(activitymsg, false);
        }

        /* Also let the master know that we made some progress */
        if (!dying)
            XLogWalRcvSendReply(false, false);
    }
}

static void XLogWalRcvProcessMsg ( unsigned char  type,
char *  buf,
Size  len 
) [static]

Definition at line 789 of file walreceiver.c.

References appendBinaryStringInfo(), ereport, errcode(), errmsg_internal(), ERROR, IntegerTimestampToTimestampTz(), pq_getmsgbyte(), pq_getmsgint64(), ProcessWalSndrMessage(), resetStringInfo(), XLogWalRcvSendReply(), and XLogWalRcvWrite().

Referenced by WalReceiverMain().

{
    int         hdrlen;
    XLogRecPtr  dataStart;
    XLogRecPtr  walEnd;
    TimestampTz sendTime;
    bool        replyRequested;

    resetStringInfo(&incoming_message);

    switch (type)
    {
        case 'w':               /* WAL records */
            {
                /* copy message to StringInfo */
                hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
                if (len < hdrlen)
                    ereport(ERROR,
                            (errcode(ERRCODE_PROTOCOL_VIOLATION),
                             errmsg_internal("invalid WAL message received from primary")));
                appendBinaryStringInfo(&incoming_message, buf, hdrlen);

                /* read the fields */
                dataStart = pq_getmsgint64(&incoming_message);
                walEnd = pq_getmsgint64(&incoming_message);
                sendTime = IntegerTimestampToTimestampTz(
                    pq_getmsgint64(&incoming_message));
                ProcessWalSndrMessage(walEnd, sendTime);

                buf += hdrlen;
                len -= hdrlen;
                XLogWalRcvWrite(buf, len, dataStart);
                break;
            }
        case 'k':               /* Keepalive */
            {
                /* copy message to StringInfo */
                hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
                if (len != hdrlen)
                    ereport(ERROR,
                            (errcode(ERRCODE_PROTOCOL_VIOLATION),
                             errmsg_internal("invalid keepalive message received from primary")));
                appendBinaryStringInfo(&incoming_message, buf, hdrlen);

                /* read the fields */
                walEnd = pq_getmsgint64(&incoming_message);
                sendTime = IntegerTimestampToTimestampTz(
                    pq_getmsgint64(&incoming_message));
                replyRequested = pq_getmsgbyte(&incoming_message);

                ProcessWalSndrMessage(walEnd, sendTime);

                /* If the primary requested a reply, send one immediately */
                if (replyRequested)
                    XLogWalRcvSendReply(true, false);
                break;
            }
        default:
            ereport(ERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg_internal("invalid replication message type %d",
                                     type)));
    }
}

static void XLogWalRcvSendHSFeedback ( bool  immed  )  [static]

Definition at line 1087 of file walreceiver.c.

References Assert, StringInfoData::data, DEBUG2, elog, GetCurrentIntegerTimestamp(), GetCurrentTimestamp(), GetNextXidAndEpoch(), GetOldestXmin(), hot_standby_feedback, HotStandbyActive(), StringInfoData::len, pq_sendbyte(), pq_sendint(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), TransactionIdIsValid, wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain().

{
    TimestampTz now;
    TransactionId nextXid;
    uint32      nextEpoch;
    TransactionId xmin;
    static TimestampTz sendTime = 0;
    static bool master_has_standby_xmin = false;

    /*
     * If the user doesn't want status to be reported to the master, be sure
     * to exit before doing anything at all.
     */
    if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
        !master_has_standby_xmin)
        return;

    /* Get current timestamp. */
    now = GetCurrentTimestamp();

    if (!immed)
    {
        /*
         * Send feedback at most once per wal_receiver_status_interval.
         */
        if (!TimestampDifferenceExceeds(sendTime, now,
                                    wal_receiver_status_interval * 1000))
            return;
        sendTime = now;
    }

    /*
     * If Hot Standby is not yet active there is nothing to send. Check this
     * after the interval has expired to reduce number of calls.
     */
    if (!HotStandbyActive())
    {
        Assert(!master_has_standby_xmin);
        return;
    }

    /*
     * Make the expensive call to get the oldest xmin once we are certain
     * everything else has been checked.
     */
    if (hot_standby_feedback)
        xmin = GetOldestXmin(true, false);
    else
        xmin = InvalidTransactionId;

    /*
     * Get epoch and adjust if nextXid and oldestXmin are different sides of
     * the epoch boundary.
     */
    GetNextXidAndEpoch(&nextXid, &nextEpoch);
    if (nextXid < xmin)
        nextEpoch--;

    elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
         xmin, nextEpoch);

    /* Construct the the message and send it. */
    resetStringInfo(&reply_message);
    pq_sendbyte(&reply_message, 'h');
    pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
    pq_sendint(&reply_message, xmin, 4);
    pq_sendint(&reply_message, nextEpoch, 4);
    walrcv_send(reply_message.data, reply_message.len);
    if (TransactionIdIsValid(xmin))
        master_has_standby_xmin = true;
    else
        master_has_standby_xmin = false;
}

static void XLogWalRcvSendReply ( bool  force,
bool  requestReply 
) [static]

Definition at line 1021 of file walreceiver.c.

References StringInfoData::data, DEBUG2, elog, GetCurrentIntegerTimestamp(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), StringInfoData::len, LogstreamResult, NULL, pq_sendbyte(), pq_sendint64(), resetStringInfo(), TimestampDifferenceExceeds(), wal_receiver_status_interval, and walrcv_send.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvProcessMsg().

{
    static XLogRecPtr writePtr = 0;
    static XLogRecPtr flushPtr = 0;
    XLogRecPtr  applyPtr;
    static TimestampTz sendTime = 0;
    TimestampTz now;

    /*
     * If the user doesn't want status to be reported to the master, be sure
     * to exit before doing anything at all.
     */
    if (!force && wal_receiver_status_interval <= 0)
        return;

    /* Get current timestamp. */
    now = GetCurrentTimestamp();

    /*
     * We can compare the write and flush positions to the last message we
     * sent without taking any lock, but the apply position requires a spin
     * lock, so we don't check that unless something else has changed or 10
     * seconds have passed.  This means that the apply log position will
     * appear, from the master's point of view, to lag slightly, but since
     * this is only for reporting purposes and only on idle systems, that's
     * probably OK.
     */
    if (!force
        && writePtr == LogstreamResult.Write
        && flushPtr == LogstreamResult.Flush
        && !TimestampDifferenceExceeds(sendTime, now,
                                       wal_receiver_status_interval * 1000))
        return;
    sendTime = now;

    /* Construct a new message */
    writePtr = LogstreamResult.Write;
    flushPtr = LogstreamResult.Flush;
    applyPtr = GetXLogReplayRecPtr(NULL);

    resetStringInfo(&reply_message);
    pq_sendbyte(&reply_message, 'r');
    pq_sendint64(&reply_message, writePtr);
    pq_sendint64(&reply_message, flushPtr);
    pq_sendint64(&reply_message, applyPtr);
    pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
    pq_sendbyte(&reply_message, requestReply ? 1 : 0);

    /* Send it */
    elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
         (uint32) (writePtr >> 32), (uint32) writePtr,
         (uint32) (flushPtr >> 32), (uint32) flushPtr,
         (uint32) (applyPtr >> 32), (uint32) applyPtr,
         requestReply ? " (reply requested)" : "");

    walrcv_send(reply_message.data, reply_message.len);
}

static void XLogWalRcvWrite ( char *  buf,
Size  nbytes,
XLogRecPtr  recptr 
) [static]

Definition at line 858 of file walreceiver.c.

References close, ereport, errcode_for_file_access(), errmsg(), LogstreamResult, PANIC, recvFile, recvFileTLI, recvOff, recvSegNo, ThisTimeLineID, write, XLByteInSeg, XLByteToSeg, XLogArchiveForceDone(), XLogFileInit(), XLogFileName, XLogFileNameP(), XLogSegSize, and XLogWalRcvFlush().

Referenced by XLogWalRcvProcessMsg().

{
    int         startoff;
    int         byteswritten;

    while (nbytes > 0)
    {
        int         segbytes;

        if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
        {
            bool        use_existent;

            /*
             * fsync() and close current file before we switch to next one. We
             * would otherwise have to reopen this file to fsync it later
             */
            if (recvFile >= 0)
            {
                char        xlogfname[MAXFNAMELEN];

                XLogWalRcvFlush(false);

                /*
                 * XLOG segment files will be re-read by recovery in startup
                 * process soon, so we don't advise the OS to release cache
                 * pages associated with the file like XLogFileClose() does.
                 */
                if (close(recvFile) != 0)
                    ereport(PANIC,
                            (errcode_for_file_access(),
                             errmsg("could not close log segment %s: %m",
                                    XLogFileNameP(recvFileTLI, recvSegNo))));

                /*
                 * Create .done file forcibly to prevent the streamed segment from
                 * being archived later.
                 */
                XLogFileName(xlogfname, recvFileTLI, recvSegNo);
                XLogArchiveForceDone(xlogfname);
            }
            recvFile = -1;

            /* Create/use new log file */
            XLByteToSeg(recptr, recvSegNo);
            use_existent = true;
            recvFile = XLogFileInit(recvSegNo, &use_existent, true);
            recvFileTLI = ThisTimeLineID;
            recvOff = 0;
        }

        /* Calculate the start offset of the received logs */
        startoff = recptr % XLogSegSize;

        if (startoff + nbytes > XLogSegSize)
            segbytes = XLogSegSize - startoff;
        else
            segbytes = nbytes;

        /* Need to seek in the file? */
        if (recvOff != startoff)
        {
            if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
                ereport(PANIC,
                        (errcode_for_file_access(),
                         errmsg("could not seek in log segment %s, to offset %u: %m",
                                XLogFileNameP(recvFileTLI, recvSegNo),
                                startoff)));
            recvOff = startoff;
        }

        /* OK to write the logs */
        errno = 0;

        byteswritten = write(recvFile, buf, segbytes);
        if (byteswritten <= 0)
        {
            /* if write didn't set errno, assume no disk space */
            if (errno == 0)
                errno = ENOSPC;
            ereport(PANIC,
                    (errcode_for_file_access(),
                     errmsg("could not write to log segment %s "
                            "at offset %u, length %lu: %m",
                            XLogFileNameP(recvFileTLI, recvSegNo),
                            recvOff, (unsigned long) segbytes)));
        }

        /* Update state for write */
        recptr += byteswritten;

        recvOff += byteswritten;
        nbytes -= byteswritten;
        buf += byteswritten;

        LogstreamResult.Write = recptr;
    }
}


Variable Documentation

Definition at line 107 of file walreceiver.c.

volatile sig_atomic_t got_SIGHUP = false [static]

Definition at line 97 of file walreceiver.c.

Referenced by WalRcvSigHupHandler(), and WalReceiverMain().

volatile sig_atomic_t got_SIGTERM = false [static]

Definition at line 98 of file walreceiver.c.

Referenced by ProcessWalRcvInterrupts(), and WalRcvShutdownHandler().

Definition at line 69 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

Definition at line 111 of file walreceiver.c.

struct { ... } LogstreamResult [static]
int recvFile = -1 [static]

Definition at line 88 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

TimeLineID recvFileTLI = 0 [static]

Definition at line 89 of file walreceiver.c.

Referenced by WalReceiverMain(), and XLogWalRcvWrite().

uint32 recvOff = 0 [static]

Definition at line 91 of file walreceiver.c.

Referenced by XLogWalRcvWrite().

XLogSegNo recvSegNo = 0 [static]

Definition at line 90 of file walreceiver.c.

Referenced by WalReceiverMain(), XLogWalRcvFlush(), and XLogWalRcvWrite().

Definition at line 110 of file walreceiver.c.

Definition at line 67 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Definition at line 68 of file walreceiver.c.

Referenced by WalReceiverMain().

Definition at line 72 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

Definition at line 79 of file walreceiver.c.

Referenced by _PG_init(), WalRcvDie(), and WalReceiverMain().

Definition at line 75 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

Definition at line 73 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

Definition at line 76 of file walreceiver.c.

Referenced by _PG_init(), WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

Definition at line 77 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

Definition at line 74 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

volatile bool WalRcvImmediateInterruptOK = false [static]

Definition at line 106 of file walreceiver.c.

Referenced by XLogWrite().