Header And Logo

PostgreSQL
| The world's most advanced open source database.

Data Structures | Defines | Typedefs | Enumerations | Functions | Variables

walreceiver.h File Reference

#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "pgtime.h"
Include dependency graph for walreceiver.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  WalRcvData

Defines

#define MAXCONNINFO   1024
#define AllowCascadeReplication()   (EnableHotStandby && max_wal_senders > 0)

Typedefs

typedef void(* walrcv_connect_type )(char *conninfo)
typedef void(* walrcv_identify_system_type )(TimeLineID *primary_tli)
typedef void(* walrcv_readtimelinehistoryfile_type )(TimeLineID tli, char **filename, char **content, int *size)
typedef bool(* walrcv_startstreaming_type )(TimeLineID tli, XLogRecPtr startpoint)
typedef void(* walrcv_endstreaming_type )(TimeLineID *next_tli)
typedef int(* walrcv_receive_type )(int timeout, char **buffer)
typedef void(* walrcv_send_type )(const char *buffer, int nbytes)
typedef void(* walrcv_disconnect_type )(void)

Enumerations

enum  WalRcvState {
  WALRCV_STOPPED, WALRCV_STARTING, WALRCV_STREAMING, WALRCV_WAITING,
  WALRCV_RESTARTING, WALRCV_STOPPING
}

Functions

void WalReceiverMain (void) __attribute__((noreturn))
Size WalRcvShmemSize (void)
void WalRcvShmemInit (void)
void ShutdownWalRcv (void)
bool WalRcvStreaming (void)
bool WalRcvRunning (void)
void RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
XLogRecPtr GetWalRcvWriteRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
int GetReplicationApplyDelay (void)
int GetReplicationTransferLatency (void)

Variables

int wal_receiver_status_interval
int wal_receiver_timeout
bool hot_standby_feedback
WalRcvDataWalRcv
PGDLLIMPORT walrcv_connect_type walrcv_connect
PGDLLIMPORT
walrcv_identify_system_type 
walrcv_identify_system
PGDLLIMPORT
walrcv_readtimelinehistoryfile_type 
walrcv_readtimelinehistoryfile
PGDLLIMPORT
walrcv_startstreaming_type 
walrcv_startstreaming
PGDLLIMPORT
walrcv_endstreaming_type 
walrcv_endstreaming
PGDLLIMPORT walrcv_receive_type walrcv_receive
PGDLLIMPORT walrcv_send_type walrcv_send
PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect

Define Documentation

#define AllowCascadeReplication (  )     (EnableHotStandby && max_wal_senders > 0)

Definition at line 34 of file walreceiver.h.

Referenced by StartupXLOG(), and XLogWalRcvFlush().

#define MAXCONNINFO   1024

Definition at line 31 of file walreceiver.h.

Referenced by libpqrcv_connect(), RequestXLogStreaming(), and WalReceiverMain().


Typedef Documentation

typedef void(* walrcv_connect_type)(char *conninfo)

Definition at line 119 of file walreceiver.h.

typedef void(* walrcv_disconnect_type)(void)

Definition at line 140 of file walreceiver.h.

typedef void(* walrcv_endstreaming_type)(TimeLineID *next_tli)

Definition at line 131 of file walreceiver.h.

typedef void(* walrcv_identify_system_type)(TimeLineID *primary_tli)

Definition at line 122 of file walreceiver.h.

typedef void(* walrcv_readtimelinehistoryfile_type)(TimeLineID tli, char **filename, char **content, int *size)

Definition at line 125 of file walreceiver.h.

typedef int(* walrcv_receive_type)(int timeout, char **buffer)

Definition at line 134 of file walreceiver.h.

typedef void(* walrcv_send_type)(const char *buffer, int nbytes)

Definition at line 137 of file walreceiver.h.

Definition at line 128 of file walreceiver.h.


Enumeration Type Documentation

Enumerator:
WALRCV_STOPPED 
WALRCV_STARTING 
WALRCV_STREAMING 
WALRCV_WAITING 
WALRCV_RESTARTING 
WALRCV_STOPPING 

Definition at line 39 of file walreceiver.h.

{
    WALRCV_STOPPED,             /* stopped and mustn't start up again */
    WALRCV_STARTING,            /* launched, but the process hasn't
                                 * initialized yet */
    WALRCV_STREAMING,           /* walreceiver is streaming */
    WALRCV_WAITING,             /* stopped streaming, waiting for orders */
    WALRCV_RESTARTING,          /* asked to restart streaming */
    WALRCV_STOPPING             /* requested to stop, but still running */
} WalRcvState;


Function Documentation

int GetReplicationApplyDelay ( void   ) 

Definition at line 312 of file walreceiverfuncs.c.

References GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, NULL, WalRcvData::receivedUpto, SpinLockAcquire, SpinLockRelease, and TimestampDifference().

Referenced by ProcessWalSndrMessage().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;

    XLogRecPtr  receivePtr;
    XLogRecPtr  replayPtr;

    long        secs;
    int         usecs;

    SpinLockAcquire(&walrcv->mutex);
    receivePtr = walrcv->receivedUpto;
    SpinLockRelease(&walrcv->mutex);

    replayPtr = GetXLogReplayRecPtr(NULL);

    if (receivePtr == replayPtr)
        return 0;

    TimestampDifference(GetCurrentChunkReplayStartTime(),
                        GetCurrentTimestamp(),
                        &secs, &usecs);

    return (((int) secs * 1000) + (usecs / 1000));
}

int GetReplicationTransferLatency ( void   ) 

Definition at line 344 of file walreceiverfuncs.c.

References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, and TimestampDifference().

Referenced by ProcessWalSndrMessage().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;

    TimestampTz lastMsgSendTime;
    TimestampTz lastMsgReceiptTime;

    long        secs = 0;
    int         usecs = 0;
    int         ms;

    SpinLockAcquire(&walrcv->mutex);
    lastMsgSendTime = walrcv->lastMsgSendTime;
    lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
    SpinLockRelease(&walrcv->mutex);

    TimestampDifference(lastMsgSendTime,
                        lastMsgReceiptTime,
                        &secs, &usecs);

    ms = ((int) secs * 1000) + (usecs / 1000);

    return ms;
}

XLogRecPtr GetWalRcvWriteRecPtr ( XLogRecPtr latestChunkStart,
TimeLineID receiveTLI 
)

Definition at line 291 of file walreceiverfuncs.c.

References WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, SpinLockAcquire, and SpinLockRelease.

Referenced by CreateRestartPoint(), GetStandbyFlushRecPtr(), pg_last_xlog_receive_location(), and WaitForWALToBecomeAvailable().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    XLogRecPtr  recptr;

    SpinLockAcquire(&walrcv->mutex);
    recptr = walrcv->receivedUpto;
    if (latestChunkStart)
        *latestChunkStart = walrcv->latestChunkStart;
    if (receiveTLI)
        *receiveTLI = walrcv->receivedTLI;
    SpinLockRelease(&walrcv->mutex);

    return recptr;
}

void RequestXLogStreaming ( TimeLineID  tli,
XLogRecPtr  recptr,
const char *  conninfo 
)

Definition at line 226 of file walreceiverfuncs.c.

References Assert, WalRcvData::conninfo, WalRcvData::latch, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, now(), NULL, PMSIGNAL_START_WALRECEIVER, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegSize.

Referenced by WaitForWALToBecomeAvailable().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    bool        launch = false;
    pg_time_t   now = (pg_time_t) time(NULL);

    /*
     * We always start at the beginning of the segment. That prevents a broken
     * segment (i.e., with no records in the first half of a segment) from
     * being created by XLOG streaming, which might cause trouble later on if
     * the segment is e.g archived.
     */
    if (recptr % XLogSegSize != 0)
        recptr -= recptr % XLogSegSize;

    SpinLockAcquire(&walrcv->mutex);

    /* It better be stopped if we try to restart it */
    Assert(walrcv->walRcvState == WALRCV_STOPPED ||
           walrcv->walRcvState == WALRCV_WAITING);

    if (conninfo != NULL)
        strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
    else
        walrcv->conninfo[0] = '\0';

    if (walrcv->walRcvState == WALRCV_STOPPED)
    {
        launch = true;
        walrcv->walRcvState = WALRCV_STARTING;
    }
    else
        walrcv->walRcvState = WALRCV_RESTARTING;
    walrcv->startTime = now;

    /*
     * If this is the first startup of walreceiver, we initialize receivedUpto
     * and latestChunkStart to receiveStart.
     */
    if (walrcv->receiveStart == 0)
    {
        walrcv->receivedUpto = recptr;
        walrcv->latestChunkStart = recptr;
    }
    walrcv->receiveStart = recptr;
    walrcv->receiveStartTLI = tli;

    SpinLockRelease(&walrcv->mutex);

    if (launch)
        SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
    else
        SetLatch(&walrcv->latch);
}

void ShutdownWalRcv ( void   ) 

Definition at line 166 of file walreceiverfuncs.c.

References HandleStartupProcInterrupts(), WalRcvData::mutex, pg_usleep(), WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), and WalRcvData::walRcvState.

Referenced by StartupXLOG(), and WaitForWALToBecomeAvailable().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    pid_t       walrcvpid = 0;

    /*
     * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     * mode once it's finished, and will also request postmaster to not
     * restart itself.
     */
    SpinLockAcquire(&walrcv->mutex);
    switch (walrcv->walRcvState)
    {
        case WALRCV_STOPPED:
            break;
        case WALRCV_STARTING:
            walrcv->walRcvState = WALRCV_STOPPED;
            break;

        case WALRCV_STREAMING:
        case WALRCV_WAITING:
        case WALRCV_RESTARTING:
            walrcv->walRcvState = WALRCV_STOPPING;
            /* fall through */
        case WALRCV_STOPPING:
            walrcvpid = walrcv->pid;
            break;
    }
    SpinLockRelease(&walrcv->mutex);

    /*
     * Signal walreceiver process if it was still running.
     */
    if (walrcvpid != 0)
        kill(walrcvpid, SIGTERM);

    /*
     * Wait for walreceiver to acknowledge its death by setting state to
     * WALRCV_STOPPED.
     */
    while (WalRcvRunning())
    {
        /*
         * This possibly-long loop needs to handle interrupts of startup
         * process.
         */
        HandleStartupProcInterrupts();

        pg_usleep(100000);      /* 100ms */
    }
}

bool WalRcvRunning ( void   ) 

Definition at line 73 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WALRCV_STARTING, WALRCV_STOPPED, and WalRcvData::walRcvState.

Referenced by ShutdownWalRcv().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    WalRcvState state;
    pg_time_t   startTime;

    SpinLockAcquire(&walrcv->mutex);

    state = walrcv->walRcvState;
    startTime = walrcv->startTime;

    SpinLockRelease(&walrcv->mutex);

    /*
     * If it has taken too long for walreceiver to start up, give up. Setting
     * the state to STOPPED ensures that if walreceiver later does start up
     * after all, it will see that it's not supposed to be running and die
     * without doing anything.
     */
    if (state == WALRCV_STARTING)
    {
        pg_time_t   now = (pg_time_t) time(NULL);

        if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
        {
            SpinLockAcquire(&walrcv->mutex);

            if (walrcv->walRcvState == WALRCV_STARTING)
                state = walrcv->walRcvState = WALRCV_STOPPED;

            SpinLockRelease(&walrcv->mutex);
        }
    }

    if (state != WALRCV_STOPPED)
        return true;
    else
        return false;
}

void WalRcvShmemInit ( void   ) 

Definition at line 54 of file walreceiverfuncs.c.

References InitSharedLatch(), WalRcvData::latch, MemSet, WalRcvData::mutex, ShmemInitStruct(), SpinLockInit, WalRcvShmemSize(), and WalRcvData::walRcvState.

Referenced by CreateSharedMemoryAndSemaphores().

{
    bool        found;

    WalRcv = (WalRcvData *)
        ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);

    if (!found)
    {
        /* First time through, so initialize */
        MemSet(WalRcv, 0, WalRcvShmemSize());
        WalRcv->walRcvState = WALRCV_STOPPED;
        SpinLockInit(&WalRcv->mutex);
        InitSharedLatch(&WalRcv->latch);
    }
}

Size WalRcvShmemSize ( void   ) 

Definition at line 43 of file walreceiverfuncs.c.

References add_size().

Referenced by CreateSharedMemoryAndSemaphores(), and WalRcvShmemInit().

{
    Size        size = 0;

    size = add_size(size, sizeof(WalRcvData));

    return size;
}

bool WalRcvStreaming ( void   ) 

Definition at line 119 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STREAMING, and WalRcvData::walRcvState.

Referenced by WaitForWALToBecomeAvailable().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    WalRcvState state;
    pg_time_t   startTime;

    SpinLockAcquire(&walrcv->mutex);

    state = walrcv->walRcvState;
    startTime = walrcv->startTime;

    SpinLockRelease(&walrcv->mutex);

    /*
     * If it has taken too long for walreceiver to start up, give up. Setting
     * the state to STOPPED ensures that if walreceiver later does start up
     * after all, it will see that it's not supposed to be running and die
     * without doing anything.
     */
    if (state == WALRCV_STARTING)
    {
        pg_time_t   now = (pg_time_t) time(NULL);

        if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
        {
            SpinLockAcquire(&walrcv->mutex);

            if (walrcv->walRcvState == WALRCV_STARTING)
                state = walrcv->walRcvState = WALRCV_STOPPED;

            SpinLockRelease(&walrcv->mutex);
        }
    }

    if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
        state == WALRCV_RESTARTING)
        return true;
    else
        return false;
}

void WalReceiverMain ( void   ) 

Definition at line 187 of file walreceiver.c.

References Assert, BlockSig, buf, close, WalRcvData::conninfo, CurrentResourceOwner, DEBUG1, DisableWalRcvImmediateExit(), elog, EnableWalRcvImmediateExit(), ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, GetCurrentTimestamp(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, LogstreamResult, MAXCONNINFO, WalRcvData::mutex, MyProcPid, NAPTIME_PER_CYCLE, now(), NULL, on_shmem_exit(), OwnLatch(), PANIC, PG_SETMASK, PGC_SIGHUP, WalRcvData::pid, ping_sent, PostmasterIsAlive(), pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResourceOwnerCreate(), SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGCONT, sigdelset, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, SpinLockAcquire, SpinLockRelease, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UnBlockSig, wal_receiver_timeout, WalRcv, walrcv_connect, walrcv_disconnect, walrcv_endstreaming, walrcv_identify_system, walrcv_readtimelinehistoryfile, walrcv_receive, WALRCV_RESTARTING, walrcv_send, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvQuickDieHandler(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvSigUsr1Handler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), XLogArchiveForceDone(), XLogFileName, XLogFileNameP(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Referenced by AuxiliaryProcessMain().

{
    char        conninfo[MAXCONNINFO];
    XLogRecPtr  startpoint;
    TimeLineID  startpointTLI;
    TimeLineID  primaryTLI;
    bool        first_stream;

    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    TimestampTz last_recv_timestamp;
    bool        ping_sent;

    /*
     * WalRcv should be set up already (if we are a backend, we inherit this
     * by fork() or EXEC_BACKEND mechanism from the postmaster).
     */
    Assert(walrcv != NULL);

    /*
     * Mark walreceiver as running in shared memory.
     *
     * Do this as early as possible, so that if we fail later on, we'll set
     * state to STOPPED. If we die before this, the startup process will keep
     * waiting for us to start up, until it times out.
     */
    SpinLockAcquire(&walrcv->mutex);
    Assert(walrcv->pid == 0);
    switch (walrcv->walRcvState)
    {
        case WALRCV_STOPPING:
            /* If we've already been requested to stop, don't start up. */
            walrcv->walRcvState = WALRCV_STOPPED;
            /* fall through */

        case WALRCV_STOPPED:
            SpinLockRelease(&walrcv->mutex);
            proc_exit(1);
            break;

        case WALRCV_STARTING:
            /* The usual case */
            break;

        case WALRCV_WAITING:
        case WALRCV_STREAMING:
        case WALRCV_RESTARTING:
        default:
            /* Shouldn't happen */
            elog(PANIC, "walreceiver still running according to shared memory state");
    }
    /* Advertise our PID so that the startup process can kill us */
    walrcv->pid = MyProcPid;
    walrcv->walRcvState = WALRCV_STREAMING;

    /* Fetch information required to start streaming */
    strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
    startpoint = walrcv->receiveStart;
    startpointTLI = walrcv->receiveStartTLI;

    /* Initialise to a sanish value */
    walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp();

    SpinLockRelease(&walrcv->mutex);

    /* Arrange to clean up at walreceiver exit */
    on_shmem_exit(WalRcvDie, 0);

    OwnLatch(&walrcv->latch);

    /*
     * If possible, make this process a group leader, so that the postmaster
     * can signal any child processes too.  (walreceiver probably never has
     * any child processes, but for consistency we make all postmaster child
     * processes do this.)
     */
#ifdef HAVE_SETSID
    if (setsid() < 0)
        elog(FATAL, "setsid() failed: %m");
#endif

    /* Properly accept or ignore signals the postmaster might send us */
    pqsignal(SIGHUP, WalRcvSigHupHandler);      /* set flag to read config
                                                 * file */
    pqsignal(SIGINT, SIG_IGN);
    pqsignal(SIGTERM, WalRcvShutdownHandler);   /* request shutdown */
    pqsignal(SIGQUIT, WalRcvQuickDieHandler);   /* hard crash time */
    pqsignal(SIGALRM, SIG_IGN);
    pqsignal(SIGPIPE, SIG_IGN);
    pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
    pqsignal(SIGUSR2, SIG_IGN);

    /* Reset some signals that are accepted by postmaster but not here */
    pqsignal(SIGCHLD, SIG_DFL);
    pqsignal(SIGTTIN, SIG_DFL);
    pqsignal(SIGTTOU, SIG_DFL);
    pqsignal(SIGCONT, SIG_DFL);
    pqsignal(SIGWINCH, SIG_DFL);

    /* We allow SIGQUIT (quickdie) at all times */
    sigdelset(&BlockSig, SIGQUIT);

    /* Load the libpq-specific functions */
    load_file("libpqwalreceiver", false);
    if (walrcv_connect == NULL || walrcv_startstreaming == NULL ||
        walrcv_endstreaming == NULL ||
        walrcv_identify_system == NULL ||
        walrcv_readtimelinehistoryfile == NULL ||
        walrcv_receive == NULL || walrcv_send == NULL ||
        walrcv_disconnect == NULL)
        elog(ERROR, "libpqwalreceiver didn't initialize correctly");

    /*
     * Create a resource owner to keep track of our resources (not clear that
     * we need this, but may as well have one).
     */
    CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");

    /* Unblock signals (they were blocked when the postmaster forked us) */
    PG_SETMASK(&UnBlockSig);

    /* Establish the connection to the primary for XLOG streaming */
    EnableWalRcvImmediateExit();
    walrcv_connect(conninfo);
    DisableWalRcvImmediateExit();

    first_stream = true;
    for (;;)
    {
        /*
         * Check that we're connected to a valid server using the
         * IDENTIFY_SYSTEM replication command,
         */
        EnableWalRcvImmediateExit();
        walrcv_identify_system(&primaryTLI);
        DisableWalRcvImmediateExit();

        /*
         * Confirm that the current timeline of the primary is the same or
         * ahead of ours.
         */
        if (primaryTLI < startpointTLI)
            ereport(ERROR,
                    (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
                            primaryTLI, startpointTLI)));

        /*
         * Get any missing history files. We do this always, even when we're
         * not interested in that timeline, so that if we're promoted to become
         * the master later on, we don't select the same timeline that was
         * already used in the current master. This isn't bullet-proof - you'll
         * need some external software to manage your cluster if you need to
         * ensure that a unique timeline id is chosen in every case, but let's
         * avoid the confusion of timeline id collisions where we can.
         */
        WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);

        /*
         * Start streaming.
         *
         * We'll try to start at the requested starting point and timeline,
         * even if it's different from the server's latest timeline. In case
         * we've already reached the end of the old timeline, the server will
         * finish the streaming immediately, and we will go back to await
         * orders from the startup process. If recovery_target_timeline is
         * 'latest', the startup process will scan pg_xlog and find the new
         * history file, bump recovery target timeline, and ask us to restart
         * on the new timeline.
         */
        ThisTimeLineID = startpointTLI;
        if (walrcv_startstreaming(startpointTLI, startpoint))
        {
            bool endofwal = false;

            if (first_stream)
                ereport(LOG,
                        (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
                                (uint32) (startpoint >> 32), (uint32) startpoint,
                                startpointTLI)));
            else
                ereport(LOG,
                        (errmsg("restarted WAL streaming at %X/%X on timeline %u",
                                (uint32) (startpoint >> 32), (uint32) startpoint,
                                startpointTLI)));
            first_stream = false;

            /* Initialize LogstreamResult and buffers for processing messages */
            LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
            initStringInfo(&reply_message);
            initStringInfo(&incoming_message);

            /* Initialize the last recv timestamp */
            last_recv_timestamp = GetCurrentTimestamp();
            ping_sent = false;

            /* Loop until end-of-streaming or error */
            while (!endofwal)
            {
                char       *buf;
                int         len;

                /*
                 * Emergency bailout if postmaster has died.  This is to avoid
                 * the necessity for manual cleanup of all postmaster children.
                 */
                if (!PostmasterIsAlive())
                    exit(1);

                /*
                 * Exit walreceiver if we're not in recovery. This should not
                 * happen, but cross-check the status here.
                 */
                if (!RecoveryInProgress())
                    ereport(FATAL,
                            (errmsg("cannot continue WAL streaming, recovery has already ended")));

                /* Process any requests or signals received recently */
                ProcessWalRcvInterrupts();

                if (got_SIGHUP)
                {
                    got_SIGHUP = false;
                    ProcessConfigFile(PGC_SIGHUP);
                    XLogWalRcvSendHSFeedback(true);
                }

                /* Wait a while for data to arrive */
                len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
                if (len != 0)
                {
                    /*
                     * Process the received data, and any subsequent data we
                     * can read without blocking.
                     */
                    for (;;)
                    {
                        if (len > 0)
                        {
                            /* Something was received from master, so reset timeout */
                            last_recv_timestamp = GetCurrentTimestamp();
                            ping_sent = false;
                            XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
                        }
                        else if (len == 0)
                            break;
                        else if (len < 0)
                        {
                            ereport(LOG,
                                    (errmsg("replication terminated by primary server"),
                                     errdetail("End of WAL reached on timeline %u at %X/%X",
                                               startpointTLI,
                                               (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
                            endofwal = true;
                            break;
                        }
                        len = walrcv_receive(0, &buf);
                    }

                    /* Let the master know that we received some data. */
                    XLogWalRcvSendReply(false, false);

                    /*
                     * If we've written some records, flush them to disk and
                     * let the startup process and primary server know about
                     * them.
                     */
                    XLogWalRcvFlush(false);
                }
                else
                {
                    /*
                     * We didn't receive anything new. If we haven't heard
                     * anything from the server for more than
                     * wal_receiver_timeout / 2, ping the server. Also, if it's
                     * been longer than wal_receiver_status_interval since the
                     * last update we sent, send a status update to the master
                     * anyway, to report any progress in applying WAL.
                     */
                    bool requestReply = false;

                    /*
                     * Check if time since last receive from standby has
                     * reached the configured limit.
                     */
                    if (wal_receiver_timeout > 0)
                    {
                        TimestampTz now = GetCurrentTimestamp();
                        TimestampTz timeout;

                        timeout =
                            TimestampTzPlusMilliseconds(last_recv_timestamp,
                                                        wal_receiver_timeout);

                        if (now >= timeout)
                            ereport(ERROR,
                                    (errmsg("terminating walreceiver due to timeout")));

                        /*
                         * We didn't receive anything new, for half of receiver
                         * replication timeout. Ping the server.
                         */
                        if (!ping_sent)
                        {
                            timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
                                                                  (wal_receiver_timeout/2));
                            if (now >= timeout)
                            {
                                requestReply = true;
                                ping_sent = true;
                            }
                        }
                    }

                    XLogWalRcvSendReply(requestReply, requestReply);
                    XLogWalRcvSendHSFeedback(false);
                }
            }

            /*
             * The backend finished streaming. Exit streaming COPY-mode from
             * our side, too.
             */
            EnableWalRcvImmediateExit();
            walrcv_endstreaming(&primaryTLI);
            DisableWalRcvImmediateExit();

            /*
             * If the server had switched to a new timeline that we didn't know
             * about when we began streaming, fetch its timeline history file
             * now.
             */
            WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
        }
        else
            ereport(LOG,
                    (errmsg("primary server contains no more WAL on requested timeline %u",
                            startpointTLI)));

        /*
         * End of WAL reached on the requested timeline. Close the last
         * segment, and await for new orders from the startup process.
         */
        if (recvFile >= 0)
        {
            char        xlogfname[MAXFNAMELEN];

            XLogWalRcvFlush(false);
            if (close(recvFile) != 0)
                ereport(PANIC,
                        (errcode_for_file_access(),
                         errmsg("could not close log segment %s: %m",
                                XLogFileNameP(recvFileTLI, recvSegNo))));

            /*
             * Create .done file forcibly to prevent the streamed segment from
             * being archived later.
             */
            XLogFileName(xlogfname, recvFileTLI, recvSegNo);
            XLogArchiveForceDone(xlogfname);
        }
        recvFile = -1;

        elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
        WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
    }
    /* not reached */
}


Variable Documentation

Definition at line 69 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback().

Definition at line 67 of file walreceiver.c.

Referenced by XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().

Definition at line 68 of file walreceiver.c.

Referenced by WalReceiverMain().

Definition at line 72 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

Definition at line 79 of file walreceiver.c.

Referenced by _PG_init(), WalRcvDie(), and WalReceiverMain().

Definition at line 75 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

Definition at line 73 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

Definition at line 76 of file walreceiver.c.

Referenced by _PG_init(), WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().

Definition at line 77 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().

Definition at line 74 of file walreceiver.c.

Referenced by _PG_init(), and WalReceiverMain().