Header And Logo

PostgreSQL
| The world's most advanced open source database.

Defines | Functions | Variables

walsender.c File Reference

#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "replication/basebackup.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Defines

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
#define PG_STAT_GET_WAL_SENDERS_COLS   8

Functions

static void WalSndSigHupHandler (SIGNAL_ARGS)
static void WalSndXLogSendHandler (SIGNAL_ARGS)
static void WalSndLastCycleHandler (SIGNAL_ARGS)
static void WalSndLoop (void)
static void InitWalSenderSlot (void)
static void WalSndKill (int code, Datum arg)
static void XLogSend (bool *caughtup)
static XLogRecPtr GetStandbyFlushRecPtr (void)
static void IdentifySystem (void)
static void StartReplication (StartReplicationCmd *cmd)
static void ProcessStandbyMessage (void)
static void ProcessStandbyReplyMessage (void)
static void ProcessStandbyHSFeedbackMessage (void)
static void ProcessRepliesIfAny (void)
static void WalSndKeepalive (bool requestReply)
void InitWalSender (void)
void WalSndErrorCleanup ()
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
void exec_replication_command (const char *cmd_string)
static void XLogRead (char *buf, XLogRecPtr startptr, Size count)
void WalSndRqstFileReload (void)
void WalSndSignals (void)
Size WalSndShmemSize (void)
void WalSndShmemInit (void)
void WalSndWakeup (void)
void WalSndSetState (WalSndState state)
static const char * WalSndGetStateString (WalSndState state)
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)

Variables

WalSndCtlDataWalSndCtl = NULL
WalSndMyWalSnd = NULL
bool am_walsender = false
bool am_cascading_walsender = false
int max_wal_senders = 0
int wal_sender_timeout = 60 * 1000
bool wake_wal_senders = false
static int sendFile = -1
static XLogSegNo sendSegNo = 0
static uint32 sendOff = 0
static TimeLineID curFileTimeLine = 0
static TimeLineID sendTimeLine = 0
static TimeLineID sendTimeLineNextTLI = 0
static bool sendTimeLineIsHistoric = false
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static XLogRecPtr sentPtr = 0
static StringInfoData output_message
static StringInfoData reply_message
static StringInfoData tmpbuf
static TimestampTz last_reply_timestamp
static bool ping_sent = false
static bool streamingDoneSending
static bool streamingDoneReceiving
static volatile sig_atomic_t got_SIGHUP = false
static volatile sig_atomic_t walsender_ready_to_stop = false
static volatile sig_atomic_t replication_active = false

Define Documentation

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 82 of file walsender.c.

Referenced by XLogSend().

#define PG_STAT_GET_WAL_SENDERS_COLS   8

Referenced by pg_stat_get_wal_senders().


Function Documentation

void exec_replication_command ( const char *  cmd_string  ) 

Definition at line 606 of file walsender.c.

References ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CurrentMemoryContext, DEBUG1, DestRemote, elog, EndCommand(), ereport, errcode(), errmsg_internal(), ERROR, IdentifySystem(), MemoryContextDelete(), MemoryContextSwitchTo(), replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), StartReplication(), T_BaseBackupCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, TimeLineHistoryCmd::type, and Node::type.

Referenced by PostgresMain().

{
    int         parse_rc;
    Node       *cmd_node;
    MemoryContext cmd_context;
    MemoryContext old_context;

    elog(DEBUG1, "received replication command: %s", cmd_string);

    CHECK_FOR_INTERRUPTS();

    cmd_context = AllocSetContextCreate(CurrentMemoryContext,
                                        "Replication command context",
                                        ALLOCSET_DEFAULT_MINSIZE,
                                        ALLOCSET_DEFAULT_INITSIZE,
                                        ALLOCSET_DEFAULT_MAXSIZE);
    old_context = MemoryContextSwitchTo(cmd_context);

    replication_scanner_init(cmd_string);
    parse_rc = replication_yyparse();
    if (parse_rc != 0)
        ereport(ERROR,
                (errcode(ERRCODE_SYNTAX_ERROR),
                 (errmsg_internal("replication command parser returned %d",
                                  parse_rc))));

    cmd_node = replication_parse_result;

    switch (cmd_node->type)
    {
        case T_IdentifySystemCmd:
            IdentifySystem();
            break;

        case T_StartReplicationCmd:
            StartReplication((StartReplicationCmd *) cmd_node);
            break;

        case T_BaseBackupCmd:
            SendBaseBackup((BaseBackupCmd *) cmd_node);
            break;

        case T_TimeLineHistoryCmd:
            SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
            break;

        default:
            elog(ERROR, "unrecognized replication command node tag: %u",
                 cmd_node->type);
    }

    /* done */
    MemoryContextSwitchTo(old_context);
    MemoryContextDelete(cmd_context);

    /* Send CommandComplete message */
    EndCommand("SELECT", DestRemote);
}

static XLogRecPtr GetStandbyFlushRecPtr ( void   )  [static]

Definition at line 1622 of file walsender.c.

References GetWalRcvWriteRecPtr(), GetXLogReplayRecPtr(), NULL, receiveTLI, and ThisTimeLineID.

Referenced by IdentifySystem(), StartReplication(), and XLogSend().

{
    XLogRecPtr replayPtr;
    TimeLineID replayTLI;
    XLogRecPtr receivePtr;
    TimeLineID receiveTLI;
    XLogRecPtr  result;

    /*
     * We can safely send what's already been replayed. Also, if walreceiver
     * is streaming WAL from the same timeline, we can send anything that
     * it has streamed, but hasn't been replayed yet.
     */

    receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
    replayPtr = GetXLogReplayRecPtr(&replayTLI);

    ThisTimeLineID = replayTLI;

    result = replayPtr;
    if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
        result = receivePtr;

    return result;
}

static void IdentifySystem ( void   )  [static]

Definition at line 236 of file walsender.c.

References am_cascading_walsender, buf, GetInsertRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), INT4OID, pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint(), pq_sendstring(), RecoveryInProgress(), snprintf(), TEXTOID, and ThisTimeLineID.

Referenced by exec_replication_command().

{
    StringInfoData buf;
    char        sysid[32];
    char        tli[11];
    char        xpos[MAXFNAMELEN];
    XLogRecPtr  logptr;

    /*
     * Reply with a result set with one row, three columns. First col is
     * system ID, second is timeline ID, and third is current xlog location.
     */

    snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
             GetSystemIdentifier());

    am_cascading_walsender = RecoveryInProgress();
    if (am_cascading_walsender)
    {
        /* this also updates ThisTimeLineID */
        logptr = GetStandbyFlushRecPtr();
    }
    else
        logptr = GetInsertRecPtr();

    snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);

    snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);

    /* Send a RowDescription message */
    pq_beginmessage(&buf, 'T');
    pq_sendint(&buf, 3, 2);     /* 3 fields */

    /* first field */
    pq_sendstring(&buf, "systemid");    /* col name */
    pq_sendint(&buf, 0, 4);     /* table oid */
    pq_sendint(&buf, 0, 2);     /* attnum */
    pq_sendint(&buf, TEXTOID, 4);       /* type oid */
    pq_sendint(&buf, -1, 2);    /* typlen */
    pq_sendint(&buf, 0, 4);     /* typmod */
    pq_sendint(&buf, 0, 2);     /* format code */

    /* second field */
    pq_sendstring(&buf, "timeline");    /* col name */
    pq_sendint(&buf, 0, 4);     /* table oid */
    pq_sendint(&buf, 0, 2);     /* attnum */
    pq_sendint(&buf, INT4OID, 4);       /* type oid */
    pq_sendint(&buf, 4, 2);     /* typlen */
    pq_sendint(&buf, 0, 4);     /* typmod */
    pq_sendint(&buf, 0, 2);     /* format code */

    /* third field */
    pq_sendstring(&buf, "xlogpos");
    pq_sendint(&buf, 0, 4);
    pq_sendint(&buf, 0, 2);
    pq_sendint(&buf, TEXTOID, 4);
    pq_sendint(&buf, -1, 2);
    pq_sendint(&buf, 0, 4);
    pq_sendint(&buf, 0, 2);
    pq_endmessage(&buf);

    /* Send a DataRow message */
    pq_beginmessage(&buf, 'D');
    pq_sendint(&buf, 3, 2);     /* # of columns */
    pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
    pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
    pq_sendint(&buf, strlen(tli), 4);   /* col2 len */
    pq_sendbytes(&buf, (char *) tli, strlen(tli));
    pq_sendint(&buf, strlen(xpos), 4);  /* col3 len */
    pq_sendbytes(&buf, (char *) xpos, strlen(xpos));

    pq_endmessage(&buf);
}

void InitWalSender ( void   ) 

Definition at line 187 of file walsender.c.

References am_cascading_walsender, CurrentResourceOwner, InitWalSenderSlot(), MarkPostmasterChildWalSender(), NULL, PMSIGNAL_ADVANCE_STATE_MACHINE, RecoveryInProgress(), ResourceOwnerCreate(), and SendPostmasterSignal().

Referenced by PostgresMain().

{
    am_cascading_walsender = RecoveryInProgress();

    /* Create a per-walsender data structure in shared memory */
    InitWalSenderSlot();

    /* Set up resource owner */
    CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");

    /*
     * Let postmaster know that we're a WAL sender. Once we've declared us as
     * a WAL sender process, postmaster will let us outlive the bgwriter and
     * kill us last in the shutdown sequence, so we get a chance to stream all
     * remaining WAL at shutdown, including the shutdown checkpoint. Note that
     * there's no going back, and we mustn't write any WAL records after this.
     */
    MarkPostmasterChildWalSender();
    SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
}

static void InitWalSenderSlot ( void   )  [static]

Definition at line 1133 of file walsender.c.

References Assert, ereport, errcode(), errmsg(), FATAL, i, WalSnd::latch, max_wal_senders, WalSnd::mutex, MyProcPid, NULL, on_shmem_exit(), OwnLatch(), WalSnd::pid, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndKill(), and WalSndCtlData::walsnds.

Referenced by InitWalSender().

{
    int         i;

    /*
     * WalSndCtl should be set up already (we inherit this by fork() or
     * EXEC_BACKEND mechanism from the postmaster).
     */
    Assert(WalSndCtl != NULL);
    Assert(MyWalSnd == NULL);

    /*
     * Find a free walsender slot and reserve it. If this fails, we must be
     * out of WalSnd structures.
     */
    for (i = 0; i < max_wal_senders; i++)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];

        SpinLockAcquire(&walsnd->mutex);

        if (walsnd->pid != 0)
        {
            SpinLockRelease(&walsnd->mutex);
            continue;
        }
        else
        {
            /*
             * Found a free slot. Reserve it for us.
             */
            walsnd->pid = MyProcPid;
            walsnd->sentPtr = InvalidXLogRecPtr;
            walsnd->state = WALSNDSTATE_STARTUP;
            SpinLockRelease(&walsnd->mutex);
            /* don't need the lock anymore */
            OwnLatch((Latch *) &walsnd->latch);
            MyWalSnd = (WalSnd *) walsnd;

            break;
        }
    }
    if (MyWalSnd == NULL)
        ereport(FATAL,
                (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
                 errmsg("number of requested standby connections "
                        "exceeds max_wal_senders (currently %d)",
                        max_wal_senders)));

    /* Arrange to clean up at walsender exit */
    on_shmem_exit(WalSndKill, 0);
}

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS   ) 

Definition at line 1838 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, get_call_result_type(), i, Int32GetDatum, IsA, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, NULL, palloc(), pfree(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, snprintf(), SpinLockAcquire, SpinLockRelease, WalSnd::state, superuser(), WalSnd::sync_standby_priority, SyncRepLock, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), values, WalSndGetStateString(), WalSndCtlData::walsnds, WALSNDSTATE_STREAMING, work_mem, WalSnd::write, and XLogRecPtrIsInvalid.

{
#define PG_STAT_GET_WAL_SENDERS_COLS    8
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    TupleDesc   tupdesc;
    Tuplestorestate *tupstore;
    MemoryContext per_query_ctx;
    MemoryContext oldcontext;
    int        *sync_priority;
    int         priority = 0;
    int         sync_standby = -1;
    int         i;

    /* check to see if caller supports us returning a tuplestore */
    if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("set-valued function called in context that cannot accept a set")));
    if (!(rsinfo->allowedModes & SFRM_Materialize))
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("materialize mode required, but it is not " \
                        "allowed in this context")));

    /* Build a tuple descriptor for our result type */
    if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
        elog(ERROR, "return type must be a row type");

    per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    oldcontext = MemoryContextSwitchTo(per_query_ctx);

    tupstore = tuplestore_begin_heap(true, false, work_mem);
    rsinfo->returnMode = SFRM_Materialize;
    rsinfo->setResult = tupstore;
    rsinfo->setDesc = tupdesc;

    MemoryContextSwitchTo(oldcontext);

    /*
     * Get the priorities of sync standbys all in one go, to minimise lock
     * acquisitions and to allow us to evaluate who is the current sync
     * standby. This code must match the code in SyncRepReleaseWaiters().
     */
    sync_priority = palloc(sizeof(int) * max_wal_senders);
    LWLockAcquire(SyncRepLock, LW_SHARED);
    for (i = 0; i < max_wal_senders; i++)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];

        if (walsnd->pid != 0)
        {
            /*
             * Treat a standby such as a pg_basebackup background process
             * which always returns an invalid flush location, as an
             * asynchronous standby.
             */
            sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
                0 : walsnd->sync_standby_priority;

            if (walsnd->state == WALSNDSTATE_STREAMING &&
                walsnd->sync_standby_priority > 0 &&
                (priority == 0 ||
                 priority > walsnd->sync_standby_priority) &&
                !XLogRecPtrIsInvalid(walsnd->flush))
            {
                priority = walsnd->sync_standby_priority;
                sync_standby = i;
            }
        }
    }
    LWLockRelease(SyncRepLock);

    for (i = 0; i < max_wal_senders; i++)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
        char        location[MAXFNAMELEN];
        XLogRecPtr  sentPtr;
        XLogRecPtr  write;
        XLogRecPtr  flush;
        XLogRecPtr  apply;
        WalSndState state;
        Datum       values[PG_STAT_GET_WAL_SENDERS_COLS];
        bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];

        if (walsnd->pid == 0)
            continue;

        SpinLockAcquire(&walsnd->mutex);
        sentPtr = walsnd->sentPtr;
        state = walsnd->state;
        write = walsnd->write;
        flush = walsnd->flush;
        apply = walsnd->apply;
        SpinLockRelease(&walsnd->mutex);

        memset(nulls, 0, sizeof(nulls));
        values[0] = Int32GetDatum(walsnd->pid);

        if (!superuser())
        {
            /*
             * Only superusers can see details. Other users only get the pid
             * value to know it's a walsender, but no details.
             */
            MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
        }
        else
        {
            values[1] = CStringGetTextDatum(WalSndGetStateString(state));

            snprintf(location, sizeof(location), "%X/%X",
                     (uint32) (sentPtr >> 32), (uint32) sentPtr);
            values[2] = CStringGetTextDatum(location);

            if (write == 0)
                nulls[3] = true;
            snprintf(location, sizeof(location), "%X/%X",
                     (uint32) (write >> 32), (uint32) write);
            values[3] = CStringGetTextDatum(location);

            if (flush == 0)
                nulls[4] = true;
            snprintf(location, sizeof(location), "%X/%X",
                     (uint32) (flush >> 32), (uint32) flush);
            values[4] = CStringGetTextDatum(location);

            if (apply == 0)
                nulls[5] = true;
            snprintf(location, sizeof(location), "%X/%X",
                     (uint32) (apply >> 32), (uint32) apply);
            values[5] = CStringGetTextDatum(location);

            values[6] = Int32GetDatum(sync_priority[i]);

            /*
             * More easily understood version of standby state. This is purely
             * informational, not different from priority.
             */
            if (sync_priority[i] == 0)
                values[7] = CStringGetTextDatum("async");
            else if (i == sync_standby)
                values[7] = CStringGetTextDatum("sync");
            else
                values[7] = CStringGetTextDatum("potential");
        }

        tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    }
    pfree(sync_priority);

    /* clean up and return the tuplestore */
    tuplestore_donestoring(tupstore);

    return (Datum) 0;
}

static void ProcessRepliesIfAny ( void   )  [static]

Definition at line 670 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_reply_timestamp, NULL, ping_sent, pq_getbyte_if_available(), pq_getmessage(), pq_putmessage_noblock(), proc_exit(), ProcessStandbyMessage(), resetStringInfo(), streamingDoneReceiving, and streamingDoneSending.

Referenced by WalSndLoop().

{
    unsigned char firstchar;
    int         r;
    bool        received = false;

    for (;;)
    {
        r = pq_getbyte_if_available(&firstchar);
        if (r < 0)
        {
            /* unexpected error or EOF */
            ereport(COMMERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg("unexpected EOF on standby connection")));
            proc_exit(0);
        }
        if (r == 0)
        {
            /* no data available without blocking */
            break;
        }

        /*
         * If we already received a CopyDone from the frontend, the frontend
         * should not send us anything until we've closed our end of the COPY.
         * XXX: In theory, the frontend could already send the next command
         * before receiving the CopyDone, but libpq doesn't currently allow
         * that.
         */
        if (streamingDoneReceiving && firstchar != 'X')
            ereport(FATAL,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
                            firstchar)));

        /* Handle the very limited subset of commands expected in this phase */
        switch (firstchar)
        {
                /*
                 * 'd' means a standby reply wrapped in a CopyData packet.
                 */
            case 'd':
                ProcessStandbyMessage();
                received = true;
                break;

                /*
                 * CopyDone means the standby requested to finish streaming.
                 * Reply with CopyDone, if we had not sent that already.
                 */
            case 'c':
                if (!streamingDoneSending)
                {
                    pq_putmessage_noblock('c', NULL, 0);
                    streamingDoneSending = true;
                }

                /* consume the CopyData message */
                resetStringInfo(&reply_message);
                if (pq_getmessage(&reply_message, 0))
                {
                    ereport(COMMERROR,
                            (errcode(ERRCODE_PROTOCOL_VIOLATION),
                             errmsg("unexpected EOF on standby connection")));
                    proc_exit(0);
                }

                streamingDoneReceiving = true;
                received = true;
                break;

                /*
                 * 'X' means that the standby is closing down the socket.
                 */
            case 'X':
                proc_exit(0);

            default:
                ereport(FATAL,
                        (errcode(ERRCODE_PROTOCOL_VIOLATION),
                         errmsg("invalid standby message type \"%c\"",
                                firstchar)));
        }
    }

    /*
     * Save the last reply timestamp if we've received at least one reply.
     */
    if (received)
    {
        last_reply_timestamp = GetCurrentTimestamp();
        ping_sent = false;
    }
}

static void ProcessStandbyHSFeedbackMessage ( void   )  [static]

Definition at line 861 of file walsender.c.

References DEBUG2, elog, GetNextXidAndEpoch(), MyPgXact, pq_getmsgint(), pq_getmsgint64(), TransactionIdIsNormal, TransactionIdPrecedesOrEquals(), and PGXACT::xmin.

Referenced by ProcessStandbyMessage().

{
    TransactionId nextXid;
    uint32      nextEpoch;
    TransactionId feedbackXmin;
    uint32      feedbackEpoch;

    /*
     * Decipher the reply message. The caller already consumed the msgtype
     * byte.
     */
    (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
    feedbackXmin = pq_getmsgint(&reply_message, 4);
    feedbackEpoch = pq_getmsgint(&reply_message, 4);

    elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
         feedbackXmin,
         feedbackEpoch);

    /* Unset WalSender's xmin if the feedback message value is invalid */
    if (!TransactionIdIsNormal(feedbackXmin))
    {
        MyPgXact->xmin = InvalidTransactionId;
        return;
    }

    /*
     * Check that the provided xmin/epoch are sane, that is, not in the future
     * and not so far back as to be already wrapped around.  Ignore if not.
     *
     * Epoch of nextXid should be same as standby, or if the counter has
     * wrapped, then one greater than standby.
     */
    GetNextXidAndEpoch(&nextXid, &nextEpoch);

    if (feedbackXmin <= nextXid)
    {
        if (feedbackEpoch != nextEpoch)
            return;
    }
    else
    {
        if (feedbackEpoch + 1 != nextEpoch)
            return;
    }

    if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
        return;                 /* epoch OK, but it's wrapped around */

    /*
     * Set the WalSender's xmin equal to the standby's requested xmin, so that
     * the xmin will be taken into account by GetOldestXmin.  This will hold
     * back the removal of dead rows and thereby prevent the generation of
     * cleanup conflicts on the standby server.
     *
     * There is a small window for a race condition here: although we just
     * checked that feedbackXmin precedes nextXid, the nextXid could have gotten
     * advanced between our fetching it and applying the xmin below, perhaps
     * far enough to make feedbackXmin wrap around.  In that case the xmin we
     * set here would be "in the future" and have no effect.  No point in
     * worrying about this since it's too late to save the desired data
     * anyway.  Assuming that the standby sends us an increasing sequence of
     * xmins, this could only happen during the first reply cycle, else our
     * own xmin would prevent nextXid from advancing so far.
     *
     * We don't bother taking the ProcArrayLock here.  Setting the xmin field
     * is assumed atomic, and there's no real need to prevent a concurrent
     * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
     * safe, and if we're moving it backwards, well, the data is at risk
     * already since a VACUUM could have just finished calling GetOldestXmin.)
     */
    MyPgXact->xmin = feedbackXmin;
}

static void ProcessStandbyMessage ( void   )  [static]

Definition at line 770 of file walsender.c.

References COMMERROR, ereport, errcode(), errmsg(), pq_getmessage(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), and resetStringInfo().

Referenced by ProcessRepliesIfAny().

{
    char        msgtype;

    resetStringInfo(&reply_message);

    /*
     * Read the message contents.
     */
    if (pq_getmessage(&reply_message, 0))
    {
        ereport(COMMERROR,
                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                 errmsg("unexpected EOF on standby connection")));
        proc_exit(0);
    }

    /*
     * Check message type from the first byte.
     */
    msgtype = pq_getmsgbyte(&reply_message);

    switch (msgtype)
    {
        case 'r':
            ProcessStandbyReplyMessage();
            break;

        case 'h':
            ProcessStandbyHSFeedbackMessage();
            break;

        default:
            ereport(COMMERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
                     errmsg("unexpected message type \"%c\"", msgtype)));
            proc_exit(0);
    }
}

static void ProcessStandbyReplyMessage ( void   )  [static]

Definition at line 814 of file walsender.c.

References am_cascading_walsender, WalSnd::apply, DEBUG2, elog, WalSnd::flush, WalSnd::mutex, pq_getmsgbyte(), pq_getmsgint64(), SpinLockAcquire, SpinLockRelease, SyncRepReleaseWaiters(), WalSndKeepalive(), and WalSnd::write.

Referenced by ProcessStandbyMessage().

{
    XLogRecPtr  writePtr,
                flushPtr,
                applyPtr;
    bool        replyRequested;

    /* the caller already consumed the msgtype byte */
    writePtr = pq_getmsgint64(&reply_message);
    flushPtr = pq_getmsgint64(&reply_message);
    applyPtr = pq_getmsgint64(&reply_message);
    (void) pq_getmsgint64(&reply_message);  /* sendTime; not used ATM */
    replyRequested = pq_getmsgbyte(&reply_message);

    elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
         (uint32) (writePtr >> 32), (uint32) writePtr,
         (uint32) (flushPtr >> 32), (uint32) flushPtr,
         (uint32) (applyPtr >> 32), (uint32) applyPtr,
         replyRequested ? " (reply requested)" : "");

    /* Send a reply if the standby requested one. */
    if (replyRequested)
        WalSndKeepalive(false);

    /*
     * Update shared state for this WalSender process based on reply data from
     * standby.
     */
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = MyWalSnd;

        SpinLockAcquire(&walsnd->mutex);
        walsnd->write = writePtr;
        walsnd->flush = flushPtr;
        walsnd->apply = applyPtr;
        SpinLockRelease(&walsnd->mutex);
    }

    if (!am_cascading_walsender)
        SyncRepReleaseWaiters();
}

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd  )  [static]

Definition at line 315 of file walsender.c.

References buf, BYTEAOID, CloseTransientFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, OpenTransientFile(), PG_BINARY, pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint(), pq_sendstring(), read, TEXTOID, TimeLineHistoryCmd::timeline, TLHistoryFileName, and TLHistoryFilePath.

Referenced by exec_replication_command().

{
    StringInfoData buf;
    char        histfname[MAXFNAMELEN];
    char        path[MAXPGPATH];
    int         fd;
    off_t       histfilelen;
    off_t       bytesleft;

    /*
     * Reply with a result set with one row, and two columns. The first col
     * is the name of the history file, 2nd is the contents.
     */

    TLHistoryFileName(histfname, cmd->timeline);
    TLHistoryFilePath(path, cmd->timeline);

    /* Send a RowDescription message */
    pq_beginmessage(&buf, 'T');
    pq_sendint(&buf, 2, 2);     /* 2 fields */

    /* first field */
    pq_sendstring(&buf, "filename");    /* col name */
    pq_sendint(&buf, 0, 4);     /* table oid */
    pq_sendint(&buf, 0, 2);     /* attnum */
    pq_sendint(&buf, TEXTOID, 4);       /* type oid */
    pq_sendint(&buf, -1, 2);    /* typlen */
    pq_sendint(&buf, 0, 4);     /* typmod */
    pq_sendint(&buf, 0, 2);     /* format code */

    /* second field */
    pq_sendstring(&buf, "content"); /* col name */
    pq_sendint(&buf, 0, 4);     /* table oid */
    pq_sendint(&buf, 0, 2);     /* attnum */
    pq_sendint(&buf, BYTEAOID, 4);      /* type oid */
    pq_sendint(&buf, -1, 2);    /* typlen */
    pq_sendint(&buf, 0, 4);     /* typmod */
    pq_sendint(&buf, 0, 2);     /* format code */
    pq_endmessage(&buf);

    /* Send a DataRow message */
    pq_beginmessage(&buf, 'D');
    pq_sendint(&buf, 2, 2);     /* # of columns */
    pq_sendint(&buf, strlen(histfname), 4); /* col1 len */
    pq_sendbytes(&buf, histfname, strlen(histfname));

    fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
    if (fd < 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not open file \"%s\": %m", path)));

    /* Determine file length and send it to client */
    histfilelen = lseek(fd, 0, SEEK_END);
    if (histfilelen < 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not seek to end of file \"%s\": %m", path)));
    if (lseek(fd, 0, SEEK_SET) != 0)
        ereport(ERROR,
                (errcode_for_file_access(),
                 errmsg("could not seek to beginning of file \"%s\": %m", path)));

    pq_sendint(&buf, histfilelen, 4);   /* col2 len */

    bytesleft = histfilelen;
    while (bytesleft > 0)
    {
        char rbuf[BLCKSZ];
        int nread;

        nread = read(fd, rbuf, sizeof(rbuf));
        if (nread <= 0)
            ereport(ERROR,
                    (errcode_for_file_access(),
                     errmsg("could not read file \"%s\": %m",
                            path)));
        pq_sendbytes(&buf, rbuf, nread);
        bytesleft -= nread;
    }
    CloseTransientFile(fd);

    pq_endmessage(&buf);
}

static void StartReplication ( StartReplicationCmd cmd  )  [static]

Definition at line 407 of file walsender.c.

References am_cascading_walsender, Assert, buf, ereport, errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), INT8OID, list_free_deep(), WalSnd::mutex, pq_beginmessage(), pq_endmessage(), pq_flush(), pq_puttextmessage(), pq_sendbyte(), pq_sendbytes(), pq_sendint(), pq_sendstring(), proc_exit(), readTimeLineHistory(), replication_active, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, snprintf(), SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), ThisTimeLineID, StartReplicationCmd::timeline, tliSwitchPoint(), walsender_ready_to_stop, WalSndLoop(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, and XLogRecPtrIsInvalid.

Referenced by exec_replication_command().

{
    StringInfoData buf;
    XLogRecPtr FlushPtr;

    /*
     * We assume here that we're logging enough information in the WAL for
     * log-shipping, since this is checked in PostmasterMain().
     *
     * NOTE: wal_level can only change at shutdown, so in most cases it is
     * difficult for there to be WAL data that we can still see that was
     * written at wal_level='minimal'.
     */

    /*
     * Select the timeline. If it was given explicitly by the client, use
     * that. Otherwise use the timeline of the last replayed record, which
     * is kept in ThisTimeLineID.
     */
    if (am_cascading_walsender)
    {
        /* this also updates ThisTimeLineID */
        FlushPtr = GetStandbyFlushRecPtr();
    }
    else
        FlushPtr = GetFlushRecPtr();

    if (cmd->timeline != 0)
    {
        XLogRecPtr  switchpoint;

        sendTimeLine = cmd->timeline;
        if (sendTimeLine == ThisTimeLineID)
        {
            sendTimeLineIsHistoric = false;
            sendTimeLineValidUpto = InvalidXLogRecPtr;
        }
        else
        {
            List       *timeLineHistory;

            sendTimeLineIsHistoric = true;

            /*
             * Check that the timeline the client requested for exists, and the
             * requested start location is on that timeline.
             */
            timeLineHistory = readTimeLineHistory(ThisTimeLineID);
            switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
                                         &sendTimeLineNextTLI);
            list_free_deep(timeLineHistory);

            /*
             * Found the requested timeline in the history. Check that
             * requested startpoint is on that timeline in our history.
             *
             * This is quite loose on purpose. We only check that we didn't
             * fork off the requested timeline before the switchpoint. We don't
             * check that we switched *to* it before the requested starting
             * point. This is because the client can legitimately request to
             * start replication from the beginning of the WAL segment that
             * contains switchpoint, but on the new timeline, so that it
             * doesn't end up with a partial segment. If you ask for a too old
             * starting point, you'll get an error later when we fail to find
             * the requested WAL segment in pg_xlog.
             *
             * XXX: we could be more strict here and only allow a startpoint
             * that's older than the switchpoint, if it it's still in the same
             * WAL segment.
             */
            if (!XLogRecPtrIsInvalid(switchpoint) &&
                switchpoint < cmd->startpoint)
            {
                ereport(ERROR,
                        (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
                                (uint32) (cmd->startpoint >> 32),
                                (uint32) (cmd->startpoint),
                                cmd->timeline),
                         errdetail("This server's history forked from timeline %u at %X/%X",
                                   cmd->timeline,
                                   (uint32) (switchpoint >> 32),
                                   (uint32) (switchpoint))));
            }
            sendTimeLineValidUpto = switchpoint;
        }
    }
    else
    {
        sendTimeLine = ThisTimeLineID;
        sendTimeLineValidUpto = InvalidXLogRecPtr;
        sendTimeLineIsHistoric = false;
    }

    streamingDoneSending = streamingDoneReceiving = false;

    /* If there is nothing to stream, don't even enter COPY mode */
    if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
    {
        /*
         * When we first start replication the standby will be behind the primary.
         * For some applications, for example, synchronous replication, it is
         * important to have a clear state for this initial catchup mode, so we
         * can trigger actions when we change streaming state later. We may stay
         * in this state for a long time, which is exactly why we want to be able
         * to monitor whether or not we are still here.
         */
        WalSndSetState(WALSNDSTATE_CATCHUP);

        /* Send a CopyBothResponse message, and start streaming */
        pq_beginmessage(&buf, 'W');
        pq_sendbyte(&buf, 0);
        pq_sendint(&buf, 0, 2);
        pq_endmessage(&buf);
        pq_flush();

        /*
         * Don't allow a request to stream from a future point in WAL that
         * hasn't been flushed to disk in this server yet.
         */
        if (FlushPtr < cmd->startpoint)
        {
            ereport(ERROR,
                    (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
                            (uint32) (cmd->startpoint >> 32),
                            (uint32) (cmd->startpoint),
                            (uint32) (FlushPtr >> 32),
                            (uint32) (FlushPtr))));
        }

        /* Start streaming from the requested point */
        sentPtr = cmd->startpoint;

        /* Initialize shared memory status, too */
        {
            /* use volatile pointer to prevent code rearrangement */
            volatile WalSnd *walsnd = MyWalSnd;

            SpinLockAcquire(&walsnd->mutex);
            walsnd->sentPtr = sentPtr;
            SpinLockRelease(&walsnd->mutex);
        }

        SyncRepInitConfig();

        /* Main loop of walsender */
        replication_active = true;

        WalSndLoop();

        replication_active = false;
        if (walsender_ready_to_stop)
            proc_exit(0);
        WalSndSetState(WALSNDSTATE_STARTUP);

        Assert(streamingDoneSending && streamingDoneReceiving);
    }

    /*
     * Copy is finished now. Send a single-row result set indicating the next
     * timeline.
     */
    if (sendTimeLineIsHistoric)
    {
        char        str[11];
        snprintf(str, sizeof(str), "%u", sendTimeLineNextTLI);

        pq_beginmessage(&buf, 'T'); /* RowDescription */
        pq_sendint(&buf, 1, 2);     /* 1 field */

        /* Field header */
        pq_sendstring(&buf, "next_tli");
        pq_sendint(&buf, 0, 4);     /* table oid */
        pq_sendint(&buf, 0, 2);     /* attnum */
        /*
         * int8 may seem like a surprising data type for this, but in theory
         * int4 would not be wide enough for this, as TimeLineID is unsigned.
         */
        pq_sendint(&buf, INT8OID, 4);   /* type oid */
        pq_sendint(&buf, -1, 2);
        pq_sendint(&buf, 0, 4);
        pq_sendint(&buf, 0, 2);
        pq_endmessage(&buf);

        /* Data row */
        pq_beginmessage(&buf, 'D');
        pq_sendint(&buf, 1, 2);     /* number of columns */
        pq_sendint(&buf, strlen(str), 4);   /* length */
        pq_sendbytes(&buf, str, strlen(str));
        pq_endmessage(&buf);
    }

    /* Send CommandComplete message */
    pq_puttextmessage('C', "START_STREAMING");
}

void WalSndErrorCleanup ( void   ) 

Definition at line 216 of file walsender.c.

References close, proc_exit(), replication_active, sendFile, walsender_ready_to_stop, WalSndSetState(), and WALSNDSTATE_STARTUP.

Referenced by PostgresMain().

{
    if (sendFile >= 0)
    {
        close(sendFile);
        sendFile = -1;
    }

    replication_active = false;
    if (walsender_ready_to_stop)
        proc_exit(0);

    /* Revert back to startup state */
    WalSndSetState(WALSNDSTATE_STARTUP);
}

static const char* WalSndGetStateString ( WalSndState  state  )  [static]

Definition at line 1816 of file walsender.c.

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

{
    switch (state)
    {
        case WALSNDSTATE_STARTUP:
            return "startup";
        case WALSNDSTATE_BACKUP:
            return "backup";
        case WALSNDSTATE_CATCHUP:
            return "catchup";
        case WALSNDSTATE_STREAMING:
            return "streaming";
    }
    return "UNKNOWN";
}

static void WalSndKeepalive ( bool  requestReply  )  [static]
static void WalSndKill ( int  code,
Datum  arg 
) [static]

Definition at line 1189 of file walsender.c.

References Assert, DisownLatch(), WalSnd::latch, NULL, and WalSnd::pid.

Referenced by InitWalSenderSlot().

{
    Assert(MyWalSnd != NULL);

    /*
     * Mark WalSnd struct no longer in use. Assume that no lock is required
     * for this.
     */
    MyWalSnd->pid = 0;
    DisownLatch(&MyWalSnd->latch);

    /* WalSnd struct isn't mine anymore */
    MyWalSnd = NULL;
}

static void WalSndLastCycleHandler ( SIGNAL_ARGS   )  [static]

Definition at line 1696 of file walsender.c.

References WalSnd::latch, MyProcPid, replication_active, SetLatch(), and walsender_ready_to_stop.

Referenced by WalSndSignals().

{
    int         save_errno = errno;

    /*
     * If replication has not yet started, die like with SIGTERM. If
     * replication is active, only set a flag and wake up the main loop. It
     * will send any outstanding WAL, and then exit gracefully.
     */
    if (!replication_active)
        kill(MyProcPid, SIGTERM);

    walsender_ready_to_stop = true;
    if (MyWalSnd)
        SetLatch(&MyWalSnd->latch);

    errno = save_errno;
}

static void WalSndLoop ( void   )  [static]

Definition at line 937 of file walsender.c.

References application_name, CHECK_FOR_INTERRUPTS, COMMERROR, DEBUG1, DestRemote, EndCommand(), ereport, errmsg(), GetCurrentTimestamp(), got_SIGHUP, ImmediateInterruptOK, initStringInfo(), last_reply_timestamp, WalSnd::latch, MyProcPort, PGC_SIGHUP, ping_sent, PostmasterIsAlive(), pq_flush(), pq_flush_if_writable(), pq_is_send_pending(), proc_exit(), ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), sleeptime, Port::sock, WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), TimestampTzPlusMilliseconds, WaitLatchOrSocket(), wal_sender_timeout, walsender_ready_to_stop, WalSndKeepalive(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, whereToSendOutput, WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_TIMEOUT, and XLogSend().

Referenced by StartReplication().

{
    bool        caughtup = false;

    /*
     * Allocate buffers that will be used for each outgoing and incoming
     * message.  We do this just once to reduce palloc overhead.
     */
    initStringInfo(&output_message);
    initStringInfo(&reply_message);
    initStringInfo(&tmpbuf);

    /* Initialize the last reply timestamp */
    last_reply_timestamp = GetCurrentTimestamp();
    ping_sent = false;

    /*
     * Loop until we reach the end of this timeline or the client requests
     * to stop streaming.
     */
    for (;;)
    {
        /* Clear any already-pending wakeups */
        ResetLatch(&MyWalSnd->latch);

        /*
         * Emergency bailout if postmaster has died.  This is to avoid the
         * necessity for manual cleanup of all postmaster children.
         */
        if (!PostmasterIsAlive())
            exit(1);

        /* Process any requests or signals received recently */
        if (got_SIGHUP)
        {
            got_SIGHUP = false;
            ProcessConfigFile(PGC_SIGHUP);
            SyncRepInitConfig();
        }

        CHECK_FOR_INTERRUPTS();

        /* Check for input from the client */
        ProcessRepliesIfAny();

        /*
         * If we have received CopyDone from the client, sent CopyDone
         * ourselves, and the output buffer is empty, it's time to exit
         * streaming.
         */
        if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
            break;

        /*
         * If we don't have any pending data in the output buffer, try to send
         * some more.  If there is some, we don't bother to call XLogSend
         * again until we've flushed it ... but we'd better assume we are not
         * caught up.
         */
        if (!pq_is_send_pending())
            XLogSend(&caughtup);
        else
            caughtup = false;

        /* Try to flush pending output to the client */
        if (pq_flush_if_writable() != 0)
            goto send_failure;

        /* If nothing remains to be sent right now ... */
        if (caughtup && !pq_is_send_pending())
        {
            /*
             * If we're in catchup state, move to streaming.  This is an
             * important state change for users to know about, since before
             * this point data loss might occur if the primary dies and we
             * need to failover to the standby. The state change is also
             * important for synchronous replication, since commits that
             * started to wait at that point might wait for some time.
             */
            if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
            {
                ereport(DEBUG1,
                     (errmsg("standby \"%s\" has now caught up with primary",
                             application_name)));
                WalSndSetState(WALSNDSTATE_STREAMING);
            }

            /*
             * When SIGUSR2 arrives, we send any outstanding logs up to the
             * shutdown checkpoint record (i.e., the latest record) and exit.
             * This may be a normal termination at shutdown, or a promotion,
             * the walsender is not sure which.
             */
            if (walsender_ready_to_stop)
            {
                /* ... let's just be real sure we're caught up ... */
                XLogSend(&caughtup);
                if (caughtup && !pq_is_send_pending())
                {
                    /* Inform the standby that XLOG streaming is done */
                    EndCommand("COPY 0", DestRemote);
                    pq_flush();

                    proc_exit(0);
                }
            }
        }

        /*
         * We don't block if not caught up, unless there is unsent data
         * pending in which case we'd better block until the socket is
         * write-ready.  This test is only needed for the case where XLogSend
         * loaded a subset of the available data but then pq_flush_if_writable
         * flushed it all --- we should immediately try to send more.
         */
        if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
        {
            TimestampTz timeout = 0;
            long        sleeptime = 10000;      /* 10 s */
            int         wakeEvents;

            wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT |
                WL_SOCKET_READABLE;

            if (pq_is_send_pending())
                wakeEvents |= WL_SOCKET_WRITEABLE;
            else if (wal_sender_timeout > 0 && !ping_sent)
            {
                /*
                 * If half of wal_sender_timeout has lapsed without receiving
                 * any reply from standby, send a keep-alive message to standby
                 * requesting an immediate reply.
                 */
                timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
                                                      wal_sender_timeout / 2);
                if (GetCurrentTimestamp() >= timeout)
                {
                    WalSndKeepalive(true);
                    ping_sent = true;
                    /* Try to flush pending output to the client */
                    if (pq_flush_if_writable() != 0)
                        break;
                }
            }

            /* Determine time until replication timeout */
            if (wal_sender_timeout > 0)
            {
                timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
                                                      wal_sender_timeout);
                sleeptime = 1 + (wal_sender_timeout / 10);
            }

            /* Sleep until something happens or we time out */
            ImmediateInterruptOK = true;
            CHECK_FOR_INTERRUPTS();
            WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
                              MyProcPort->sock, sleeptime);
            ImmediateInterruptOK = false;

            /*
             * Check for replication timeout.  Note we ignore the corner case
             * possibility that the client replied just as we reached the
             * timeout ... he's supposed to reply *before* that.
             */
            if (wal_sender_timeout > 0 && GetCurrentTimestamp() >= timeout)
            {
                /*
                 * Since typically expiration of replication timeout means
                 * communication problem, we don't send the error message to
                 * the standby.
                 */
                ereport(COMMERROR,
                        (errmsg("terminating walsender process due to replication timeout")));
                goto send_failure;
            }
        }
    }
    return;

send_failure:
    /*
     * Get here on send failure.  Clean up and exit.
     *
     * Reset whereToSendOutput to prevent ereport from attempting to send any
     * more messages to the standby.
     */
    if (whereToSendOutput == DestRemote)
        whereToSendOutput = DestNone;

    proc_exit(0);
    abort();                    /* keep the compiler quiet */
}

void WalSndRqstFileReload ( void   ) 

Definition at line 1652 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

{
    int         i;

    for (i = 0; i < max_wal_senders; i++)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];

        if (walsnd->pid == 0)
            continue;

        SpinLockAcquire(&walsnd->mutex);
        walsnd->needreload = true;
        SpinLockRelease(&walsnd->mutex);
    }
}

void WalSndSetState ( WalSndState  state  ) 

Definition at line 1796 of file walsender.c.

References am_walsender, Assert, WalSnd::mutex, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by SendBaseBackup(), StartReplication(), WalSndErrorCleanup(), and WalSndLoop().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalSnd *walsnd = MyWalSnd;

    Assert(am_walsender);

    if (walsnd->state == state)
        return;

    SpinLockAcquire(&walsnd->mutex);
    walsnd->state = state;
    SpinLockRelease(&walsnd->mutex);
}

void WalSndShmemInit ( void   ) 

Definition at line 1753 of file walsender.c.

References i, InitSharedLatch(), WalSnd::latch, max_wal_senders, MemSet, WalSnd::mutex, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

{
    bool        found;
    int         i;

    WalSndCtl = (WalSndCtlData *)
        ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);

    if (!found)
    {
        /* First time through, so initialize */
        MemSet(WalSndCtl, 0, WalSndShmemSize());

        for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
            SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));

        for (i = 0; i < max_wal_senders; i++)
        {
            WalSnd     *walsnd = &WalSndCtl->walsnds[i];

            SpinLockInit(&walsnd->mutex);
            InitSharedLatch(&walsnd->latch);
        }
    }
}

Size WalSndShmemSize ( void   ) 

Definition at line 1741 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

{
    Size        size = 0;

    size = offsetof(WalSndCtlData, walsnds);
    size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));

    return size;
}

static void WalSndSigHupHandler ( SIGNAL_ARGS   )  [static]

Definition at line 1672 of file walsender.c.

References got_SIGHUP, WalSnd::latch, and SetLatch().

Referenced by WalSndSignals().

{
    int         save_errno = errno;

    got_SIGHUP = true;
    if (MyWalSnd)
        SetLatch(&MyWalSnd->latch);

    errno = save_errno;
}

void WalSndSignals ( void   ) 

Definition at line 1717 of file walsender.c.

References die(), InitializeTimeouts(), pqsignal(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalSndLastCycleHandler(), WalSndSigHupHandler(), and WalSndXLogSendHandler().

Referenced by PostgresMain().

{
    /* Set up signal handlers */
    pqsignal(SIGHUP, WalSndSigHupHandler);      /* set flag to read config
                                                 * file */
    pqsignal(SIGINT, SIG_IGN);  /* not used */
    pqsignal(SIGTERM, die);                     /* request shutdown */
    pqsignal(SIGQUIT, quickdie);                /* hard crash time */
    InitializeTimeouts();       /* establishes SIGALRM handler */
    pqsignal(SIGPIPE, SIG_IGN);
    pqsignal(SIGUSR1, WalSndXLogSendHandler);   /* request WAL sending */
    pqsignal(SIGUSR2, WalSndLastCycleHandler);  /* request a last cycle and
                                                 * shutdown */

    /* Reset some signals that are accepted by postmaster but not here */
    pqsignal(SIGCHLD, SIG_DFL);
    pqsignal(SIGTTIN, SIG_DFL);
    pqsignal(SIGTTOU, SIG_DFL);
    pqsignal(SIGCONT, SIG_DFL);
    pqsignal(SIGWINCH, SIG_DFL);
}

void WalSndWakeup ( void   ) 

Definition at line 1786 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, SetLatch(), and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

{
    int         i;

    for (i = 0; i < max_wal_senders; i++)
        SetLatch(&WalSndCtl->walsnds[i].latch);
}

static void WalSndXLogSendHandler ( SIGNAL_ARGS   )  [static]

Definition at line 1685 of file walsender.c.

References latch_sigusr1_handler().

Referenced by WalSndSignals().

{
    int         save_errno = errno;

    latch_sigusr1_handler();

    errno = save_errno;
}

static void XLogRead ( char *  buf,
XLogRecPtr  startptr,
Size  count 
) [static]

Definition at line 1216 of file walsender.c.

References am_cascading_walsender, BasicOpenFile(), CheckXLogRemoved(), close, curFileTimeLine, ereport, errcode_for_file_access(), errmsg(), ERROR, WalSnd::mutex, WalSnd::needreload, PG_BINARY, read, sendFile, sendOff, sendSegNo, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, SpinLockAcquire, SpinLockRelease, ThisTimeLineID, XLByteInSeg, XLByteToSeg, XLogFileNameP(), XLogFilePath, and XLogSegSize.

Referenced by XLogSend().

{
    char       *p;
    XLogRecPtr  recptr;
    Size        nbytes;
    XLogSegNo   segno;

retry:
    p = buf;
    recptr = startptr;
    nbytes = count;

    while (nbytes > 0)
    {
        uint32      startoff;
        int         segbytes;
        int         readbytes;

        startoff = recptr % XLogSegSize;

        if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
        {
            char        path[MAXPGPATH];

            /* Switch to another logfile segment */
            if (sendFile >= 0)
                close(sendFile);

            XLByteToSeg(recptr, sendSegNo);

            /*-------
             * When reading from a historic timeline, and there is a timeline
             * switch within this segment, read from the WAL segment belonging
             * to the new timeline.
             *
             * For example, imagine that this server is currently on timeline
             * 5, and we're streaming timeline 4. The switch from timeline 4
             * to 5 happened at 0/13002088. In pg_xlog, we have these files:
             *
             * ...
             * 000000040000000000000012
             * 000000040000000000000013
             * 000000050000000000000013
             * 000000050000000000000014
             * ...
             *
             * In this situation, when requested to send the WAL from
             * segment 0x13, on timeline 4, we read the WAL from file
             * 000000050000000000000013. Archive recovery prefers files from
             * newer timelines, so if the segment was restored from the
             * archive on this server, the file belonging to the old timeline,
             * 000000040000000000000013, might not exist. Their contents are
             * equal up to the switchpoint, because at a timeline switch, the
             * used portion of the old segment is copied to the new file.
             *-------
             */
            curFileTimeLine = sendTimeLine;
            if (sendTimeLineIsHistoric)
            {
                XLogSegNo endSegNo;

                XLByteToSeg(sendTimeLineValidUpto, endSegNo);
                if (sendSegNo == endSegNo)
                    curFileTimeLine = sendTimeLineNextTLI;
            }

            XLogFilePath(path, curFileTimeLine, sendSegNo);

            sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
            if (sendFile < 0)
            {
                /*
                 * If the file is not found, assume it's because the standby
                 * asked for a too old WAL segment that has already been
                 * removed or recycled.
                 */
                if (errno == ENOENT)
                    ereport(ERROR,
                            (errcode_for_file_access(),
                             errmsg("requested WAL segment %s has already been removed",
                                    XLogFileNameP(curFileTimeLine, sendSegNo))));
                else
                    ereport(ERROR,
                            (errcode_for_file_access(),
                             errmsg("could not open file \"%s\": %m",
                                    path)));
            }
            sendOff = 0;
        }

        /* Need to seek in the file? */
        if (sendOff != startoff)
        {
            if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
                ereport(ERROR,
                        (errcode_for_file_access(),
                         errmsg("could not seek in log segment %s to offset %u: %m",
                                XLogFileNameP(curFileTimeLine, sendSegNo),
                                startoff)));
            sendOff = startoff;
        }

        /* How many bytes are within this segment? */
        if (nbytes > (XLogSegSize - startoff))
            segbytes = XLogSegSize - startoff;
        else
            segbytes = nbytes;

        readbytes = read(sendFile, p, segbytes);
        if (readbytes <= 0)
        {
            ereport(ERROR,
                    (errcode_for_file_access(),
            errmsg("could not read from log segment %s, offset %u, length %lu: %m",
                   XLogFileNameP(curFileTimeLine, sendSegNo),
                   sendOff, (unsigned long) segbytes)));
        }

        /* Update state for read */
        recptr += readbytes;

        sendOff += readbytes;
        nbytes -= readbytes;
        p += readbytes;
    }

    /*
     * After reading into the buffer, check that what we read was valid. We do
     * this after reading, because even though the segment was present when we
     * opened it, it might get recycled or removed while we read it. The
     * read() succeeds in that case, but the data we tried to read might
     * already have been overwritten with new WAL records.
     */
    XLByteToSeg(startptr, segno);
    CheckXLogRemoved(segno, ThisTimeLineID);

    /*
     * During recovery, the currently-open WAL file might be replaced with the
     * file of the same name retrieved from archive. So we always need to
     * check what we read was valid after reading into the buffer. If it's
     * invalid, we try to open and read the file again.
     */
    if (am_cascading_walsender)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = MyWalSnd;
        bool        reload;

        SpinLockAcquire(&walsnd->mutex);
        reload = walsnd->needreload;
        walsnd->needreload = false;
        SpinLockRelease(&walsnd->mutex);

        if (reload && sendFile >= 0)
        {
            close(sendFile);
            sendFile = -1;

            goto retry;
        }
    }
}

static void XLogSend ( bool caughtup  )  [static]

Definition at line 1388 of file walsender.c.

References am_cascading_walsender, Assert, close, StringInfoData::data, elog, enlargeStringInfo(), ERROR, GetCurrentIntegerTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), StringInfoData::len, list_free_deep(), MAX_SEND_SIZE, WalSnd::mutex, NULL, pq_putmessage_noblock(), pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), sendFile, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WalSnd::sentPtr, sentPtr, set_ps_display(), snprintf(), SpinLockAcquire, SpinLockRelease, streamingDoneSending, ThisTimeLineID, tliSwitchPoint(), update_process_title, and XLogRead().

Referenced by WalSndLoop().

{
    XLogRecPtr  SendRqstPtr;
    XLogRecPtr  startptr;
    XLogRecPtr  endptr;
    Size        nbytes;

    if (streamingDoneSending)
    {
        *caughtup = true;
        return;
    }

    /* Figure out how far we can safely send the WAL. */
    if (sendTimeLineIsHistoric)
    {
        /*
         * Streaming an old timeline timeline that's in this server's history,
         * but is not the one we're currently inserting or replaying. It can
         * be streamed up to the point where we switched off that timeline.
         */
        SendRqstPtr = sendTimeLineValidUpto;
    }
    else if (am_cascading_walsender)
    {
        /*
         * Streaming the latest timeline on a standby.
         *
         * Attempt to send all WAL that has already been replayed, so that
         * we know it's valid. If we're receiving WAL through streaming
         * replication, it's also OK to send any WAL that has been received
         * but not replayed.
         *
         * The timeline we're recovering from can change, or we can be
         * promoted. In either case, the current timeline becomes historic.
         * We need to detect that so that we don't try to stream past the
         * point where we switched to another timeline. We check for promotion
         * or timeline switch after calculating FlushPtr, to avoid a race
         * condition: if the timeline becomes historic just after we checked
         * that it was still current, it's still be OK to stream it up to the
         * FlushPtr that was calculated before it became historic.
         */
        bool        becameHistoric = false;

        SendRqstPtr = GetStandbyFlushRecPtr();

        if (!RecoveryInProgress())
        {
            /*
             * We have been promoted. RecoveryInProgress() updated
             * ThisTimeLineID to the new current timeline.
             */
            am_cascading_walsender = false;
            becameHistoric = true;
        }
        else
        {
            /*
             * Still a cascading standby. But is the timeline we're sending
             * still the one recovery is recovering from? ThisTimeLineID was
             * updated by the GetStandbyFlushRecPtr() call above.
             */
            if (sendTimeLine != ThisTimeLineID)
                becameHistoric = true;
        }

        if (becameHistoric)
        {
            /*
             * The timeline we were sending has become historic. Read the
             * timeline history file of the new timeline to see where exactly
             * we forked off from the timeline we were sending.
             */
            List       *history;

            history = readTimeLineHistory(ThisTimeLineID);
            sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
            Assert(sentPtr <= sendTimeLineValidUpto);
            Assert(sendTimeLine < sendTimeLineNextTLI);
            list_free_deep(history);

            /* the current send pointer should be <= the switchpoint */
            if (!(sentPtr <= sendTimeLineValidUpto))
                elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
                     sendTimeLine,
                     (uint32) (sendTimeLineValidUpto >> 32),
                     (uint32) sendTimeLineValidUpto,
                     (uint32) (sentPtr >> 32),
                     (uint32) sentPtr);

            sendTimeLineIsHistoric = true;

            SendRqstPtr = sendTimeLineValidUpto;
        }
    }
    else
    {
        /*
         * Streaming the current timeline on a master.
         *
         * Attempt to send all data that's already been written out and
         * fsync'd to disk.  We cannot go further than what's been written out
         * given the current implementation of XLogRead().  And in any case
         * it's unsafe to send WAL that is not securely down to disk on the
         * master: if the master subsequently crashes and restarts, slaves
         * must not have applied any WAL that gets lost on the master.
         */
        SendRqstPtr = GetFlushRecPtr();
    }

    /*
     * If this is a historic timeline and we've reached the point where we
     * forked to the next timeline, stop streaming.
     */
    if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
    {
        /* close the current file. */
        if (sendFile >= 0)
            close(sendFile);
        sendFile = -1;

        /* Send CopyDone */
        pq_putmessage_noblock('c', NULL, 0);
        streamingDoneSending = true;

        *caughtup = true;
        return;
    }

    /* Do we have any work to do? */
    Assert(sentPtr <= SendRqstPtr);
    if (SendRqstPtr <= sentPtr)
    {
        *caughtup = true;
        return;
    }

    /*
     * Figure out how much to send in one message. If there's no more than
     * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
     * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
     *
     * The rounding is not only for performance reasons. Walreceiver relies on
     * the fact that we never split a WAL record across two messages. Since a
     * long WAL record is split at page boundary into continuation records,
     * page boundary is always a safe cut-off point. We also assume that
     * SendRqstPtr never points to the middle of a WAL record.
     */
    startptr = sentPtr;
    endptr = startptr;
    endptr += MAX_SEND_SIZE;

    /* if we went beyond SendRqstPtr, back off */
    if (SendRqstPtr <= endptr)
    {
        endptr = SendRqstPtr;
        if (sendTimeLineIsHistoric)
            *caughtup = false;
        else
            *caughtup = true;
    }
    else
    {
        /* round down to page boundary. */
        endptr -= (endptr % XLOG_BLCKSZ);
        *caughtup = false;
    }

    nbytes = endptr - startptr;
    Assert(nbytes <= MAX_SEND_SIZE);

    /*
     * OK to read and send the slice.
     */
    resetStringInfo(&output_message);
    pq_sendbyte(&output_message, 'w');

    pq_sendint64(&output_message, startptr);    /* dataStart */
    pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
    pq_sendint64(&output_message, 0);           /* sendtime, filled in last */

    /*
     * Read the log directly into the output buffer to avoid extra memcpy
     * calls.
     */
    enlargeStringInfo(&output_message, nbytes);
    XLogRead(&output_message.data[output_message.len], startptr, nbytes);
    output_message.len += nbytes;
    output_message.data[output_message.len] = '\0';

    /*
     * Fill the send timestamp last, so that it is taken as late as possible.
     */
    resetStringInfo(&tmpbuf);
    pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
    memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
           tmpbuf.data, sizeof(int64));

    pq_putmessage_noblock('d', output_message.data, output_message.len);

    sentPtr = endptr;

    /* Update shared memory status */
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = MyWalSnd;

        SpinLockAcquire(&walsnd->mutex);
        walsnd->sentPtr = sentPtr;
        SpinLockRelease(&walsnd->mutex);
    }

    /* Report progress of XLOG streaming in PS display */
    if (update_process_title)
    {
        char        activitymsg[50];

        snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
                 (uint32) (sentPtr >> 32), (uint32) sentPtr);
        set_ps_display(activitymsg, false);
    }

    return;
}


Variable Documentation

bool am_walsender = false

Definition at line 113 of file walsender.c.

Referenced by XLogRead().

volatile sig_atomic_t got_SIGHUP = false [static]

Definition at line 154 of file walsender.c.

Referenced by WalSndLoop(), and WalSndSigHupHandler().

Definition at line 140 of file walsender.c.

Referenced by ProcessRepliesIfAny(), and WalSndLoop().

int max_wal_senders = 0
WalSnd* MyWalSnd = NULL

Definition at line 88 of file walsender.c.

Referenced by SyncRepInitConfig(), and SyncRepReleaseWaiters().

Definition at line 133 of file walsender.c.

bool ping_sent = false [static]

Definition at line 142 of file walsender.c.

Referenced by ProcessRepliesIfAny(), WalReceiverMain(), and WalSndLoop().

volatile sig_atomic_t replication_active = false [static]

Definition at line 163 of file walsender.c.

Referenced by StartReplication(), WalSndErrorCleanup(), and WalSndLastCycleHandler().

Definition at line 134 of file walsender.c.

int sendFile = -1 [static]

Definition at line 108 of file walsender.c.

Referenced by WalSndErrorCleanup(), XLogDumpXLogRead(), XLogRead(), and XLogSend().

uint32 sendOff = 0 [static]

Definition at line 110 of file walsender.c.

Referenced by XLogDumpXLogRead(), and XLogRead().

XLogSegNo sendSegNo = 0 [static]

Definition at line 109 of file walsender.c.

Referenced by XLogDumpXLogRead(), and XLogRead().

TimeLineID sendTimeLine = 0 [static]

Definition at line 121 of file walsender.c.

Referenced by StartReplication(), XLogRead(), and XLogSend().

bool sendTimeLineIsHistoric = false [static]

Definition at line 123 of file walsender.c.

Referenced by StartReplication(), XLogRead(), and XLogSend().

Definition at line 122 of file walsender.c.

Referenced by StartReplication(), XLogRead(), and XLogSend().

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr [static]

Definition at line 124 of file walsender.c.

Referenced by StartReplication(), XLogRead(), and XLogSend().

XLogRecPtr sentPtr = 0 [static]

Definition at line 130 of file walsender.c.

Referenced by pg_stat_get_wal_senders(), StartReplication(), WalSndKeepalive(), and XLogSend().

Definition at line 151 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), and WalSndLoop().

Definition at line 150 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and XLogSend().

Definition at line 102 of file walsender.c.

int wal_sender_timeout = 60 * 1000

Definition at line 97 of file walsender.c.

Referenced by WalSndLoop().

volatile sig_atomic_t walsender_ready_to_stop = false [static]