Header And Logo

PostgreSQL
| The world's most advanced open source database.

Defines | Functions | Variables

walsender.h File Reference

#include <signal.h>
#include "fmgr.h"
Include dependency graph for walsender.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Defines

#define WalSndWakeupRequest()   do { wake_wal_senders = true; } while (0)
#define WalSndWakeupProcessRequests()

Functions

void InitWalSender (void)
void exec_replication_command (const char *query_string)
void WalSndErrorCleanup (void)
void WalSndSignals (void)
Size WalSndShmemSize (void)
void WalSndShmemInit (void)
void WalSndWakeup (void)
void WalSndRqstFileReload (void)
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)

Variables

bool am_walsender
bool am_cascading_walsender
bool wake_wal_senders
int max_wal_senders
int wal_sender_timeout

Define Documentation

#define WalSndWakeupProcessRequests (  ) 
Value:
do                                      \
    {                                       \
        if (wake_wal_senders)               \
        {                                   \
            wake_wal_senders = false;       \
            if (max_wal_senders > 0)        \
                WalSndWakeup();             \
        }                                   \
    } while (0)

Definition at line 51 of file walsender.h.

Referenced by XLogBackgroundFlush(), XLogFlush(), and XLogInsert().

#define WalSndWakeupRequest (  )     do { wake_wal_senders = true; } while (0)

Definition at line 45 of file walsender.h.

Referenced by XLogWrite().


Function Documentation

void exec_replication_command ( const char *  query_string  ) 

Definition at line 606 of file walsender.c.

References ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CurrentMemoryContext, DEBUG1, DestRemote, elog, EndCommand(), ereport, errcode(), errmsg_internal(), ERROR, IdentifySystem(), MemoryContextDelete(), MemoryContextSwitchTo(), replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), StartReplication(), T_BaseBackupCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, TimeLineHistoryCmd::type, and Node::type.

Referenced by PostgresMain().

{
    int         parse_rc;
    Node       *cmd_node;
    MemoryContext cmd_context;
    MemoryContext old_context;

    elog(DEBUG1, "received replication command: %s", cmd_string);

    CHECK_FOR_INTERRUPTS();

    cmd_context = AllocSetContextCreate(CurrentMemoryContext,
                                        "Replication command context",
                                        ALLOCSET_DEFAULT_MINSIZE,
                                        ALLOCSET_DEFAULT_INITSIZE,
                                        ALLOCSET_DEFAULT_MAXSIZE);
    old_context = MemoryContextSwitchTo(cmd_context);

    replication_scanner_init(cmd_string);
    parse_rc = replication_yyparse();
    if (parse_rc != 0)
        ereport(ERROR,
                (errcode(ERRCODE_SYNTAX_ERROR),
                 (errmsg_internal("replication command parser returned %d",
                                  parse_rc))));

    cmd_node = replication_parse_result;

    switch (cmd_node->type)
    {
        case T_IdentifySystemCmd:
            IdentifySystem();
            break;

        case T_StartReplicationCmd:
            StartReplication((StartReplicationCmd *) cmd_node);
            break;

        case T_BaseBackupCmd:
            SendBaseBackup((BaseBackupCmd *) cmd_node);
            break;

        case T_TimeLineHistoryCmd:
            SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
            break;

        default:
            elog(ERROR, "unrecognized replication command node tag: %u",
                 cmd_node->type);
    }

    /* done */
    MemoryContextSwitchTo(old_context);
    MemoryContextDelete(cmd_context);

    /* Send CommandComplete message */
    EndCommand("SELECT", DestRemote);
}

void InitWalSender ( void   ) 

Definition at line 187 of file walsender.c.

References am_cascading_walsender, CurrentResourceOwner, InitWalSenderSlot(), MarkPostmasterChildWalSender(), NULL, PMSIGNAL_ADVANCE_STATE_MACHINE, RecoveryInProgress(), ResourceOwnerCreate(), and SendPostmasterSignal().

Referenced by PostgresMain().

{
    am_cascading_walsender = RecoveryInProgress();

    /* Create a per-walsender data structure in shared memory */
    InitWalSenderSlot();

    /* Set up resource owner */
    CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");

    /*
     * Let postmaster know that we're a WAL sender. Once we've declared us as
     * a WAL sender process, postmaster will let us outlive the bgwriter and
     * kill us last in the shutdown sequence, so we get a chance to stream all
     * remaining WAL at shutdown, including the shutdown checkpoint. Note that
     * there's no going back, and we mustn't write any WAL records after this.
     */
    MarkPostmasterChildWalSender();
    SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
}

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS   ) 

Definition at line 1838 of file walsender.c.

References ReturnSetInfo::allowedModes, WalSnd::apply, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, get_call_result_type(), i, Int32GetDatum, IsA, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, NULL, palloc(), pfree(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, snprintf(), SpinLockAcquire, SpinLockRelease, WalSnd::state, superuser(), WalSnd::sync_standby_priority, SyncRepLock, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), values, WalSndGetStateString(), WalSndCtlData::walsnds, WALSNDSTATE_STREAMING, work_mem, WalSnd::write, and XLogRecPtrIsInvalid.

{
#define PG_STAT_GET_WAL_SENDERS_COLS    8
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    TupleDesc   tupdesc;
    Tuplestorestate *tupstore;
    MemoryContext per_query_ctx;
    MemoryContext oldcontext;
    int        *sync_priority;
    int         priority = 0;
    int         sync_standby = -1;
    int         i;

    /* check to see if caller supports us returning a tuplestore */
    if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("set-valued function called in context that cannot accept a set")));
    if (!(rsinfo->allowedModes & SFRM_Materialize))
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("materialize mode required, but it is not " \
                        "allowed in this context")));

    /* Build a tuple descriptor for our result type */
    if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
        elog(ERROR, "return type must be a row type");

    per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    oldcontext = MemoryContextSwitchTo(per_query_ctx);

    tupstore = tuplestore_begin_heap(true, false, work_mem);
    rsinfo->returnMode = SFRM_Materialize;
    rsinfo->setResult = tupstore;
    rsinfo->setDesc = tupdesc;

    MemoryContextSwitchTo(oldcontext);

    /*
     * Get the priorities of sync standbys all in one go, to minimise lock
     * acquisitions and to allow us to evaluate who is the current sync
     * standby. This code must match the code in SyncRepReleaseWaiters().
     */
    sync_priority = palloc(sizeof(int) * max_wal_senders);
    LWLockAcquire(SyncRepLock, LW_SHARED);
    for (i = 0; i < max_wal_senders; i++)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];

        if (walsnd->pid != 0)
        {
            /*
             * Treat a standby such as a pg_basebackup background process
             * which always returns an invalid flush location, as an
             * asynchronous standby.
             */
            sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ?
                0 : walsnd->sync_standby_priority;

            if (walsnd->state == WALSNDSTATE_STREAMING &&
                walsnd->sync_standby_priority > 0 &&
                (priority == 0 ||
                 priority > walsnd->sync_standby_priority) &&
                !XLogRecPtrIsInvalid(walsnd->flush))
            {
                priority = walsnd->sync_standby_priority;
                sync_standby = i;
            }
        }
    }
    LWLockRelease(SyncRepLock);

    for (i = 0; i < max_wal_senders; i++)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
        char        location[MAXFNAMELEN];
        XLogRecPtr  sentPtr;
        XLogRecPtr  write;
        XLogRecPtr  flush;
        XLogRecPtr  apply;
        WalSndState state;
        Datum       values[PG_STAT_GET_WAL_SENDERS_COLS];
        bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];

        if (walsnd->pid == 0)
            continue;

        SpinLockAcquire(&walsnd->mutex);
        sentPtr = walsnd->sentPtr;
        state = walsnd->state;
        write = walsnd->write;
        flush = walsnd->flush;
        apply = walsnd->apply;
        SpinLockRelease(&walsnd->mutex);

        memset(nulls, 0, sizeof(nulls));
        values[0] = Int32GetDatum(walsnd->pid);

        if (!superuser())
        {
            /*
             * Only superusers can see details. Other users only get the pid
             * value to know it's a walsender, but no details.
             */
            MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
        }
        else
        {
            values[1] = CStringGetTextDatum(WalSndGetStateString(state));

            snprintf(location, sizeof(location), "%X/%X",
                     (uint32) (sentPtr >> 32), (uint32) sentPtr);
            values[2] = CStringGetTextDatum(location);

            if (write == 0)
                nulls[3] = true;
            snprintf(location, sizeof(location), "%X/%X",
                     (uint32) (write >> 32), (uint32) write);
            values[3] = CStringGetTextDatum(location);

            if (flush == 0)
                nulls[4] = true;
            snprintf(location, sizeof(location), "%X/%X",
                     (uint32) (flush >> 32), (uint32) flush);
            values[4] = CStringGetTextDatum(location);

            if (apply == 0)
                nulls[5] = true;
            snprintf(location, sizeof(location), "%X/%X",
                     (uint32) (apply >> 32), (uint32) apply);
            values[5] = CStringGetTextDatum(location);

            values[6] = Int32GetDatum(sync_priority[i]);

            /*
             * More easily understood version of standby state. This is purely
             * informational, not different from priority.
             */
            if (sync_priority[i] == 0)
                values[7] = CStringGetTextDatum("async");
            else if (i == sync_standby)
                values[7] = CStringGetTextDatum("sync");
            else
                values[7] = CStringGetTextDatum("potential");
        }

        tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    }
    pfree(sync_priority);

    /* clean up and return the tuplestore */
    tuplestore_donestoring(tupstore);

    return (Datum) 0;
}

void WalSndErrorCleanup ( void   ) 

Definition at line 216 of file walsender.c.

References close, proc_exit(), replication_active, sendFile, walsender_ready_to_stop, WalSndSetState(), and WALSNDSTATE_STARTUP.

Referenced by PostgresMain().

{
    if (sendFile >= 0)
    {
        close(sendFile);
        sendFile = -1;
    }

    replication_active = false;
    if (walsender_ready_to_stop)
        proc_exit(0);

    /* Revert back to startup state */
    WalSndSetState(WALSNDSTATE_STARTUP);
}

void WalSndRqstFileReload ( void   ) 

Definition at line 1652 of file walsender.c.

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

{
    int         i;

    for (i = 0; i < max_wal_senders; i++)
    {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];

        if (walsnd->pid == 0)
            continue;

        SpinLockAcquire(&walsnd->mutex);
        walsnd->needreload = true;
        SpinLockRelease(&walsnd->mutex);
    }
}

void WalSndShmemInit ( void   ) 

Definition at line 1753 of file walsender.c.

References i, InitSharedLatch(), WalSnd::latch, max_wal_senders, MemSet, WalSnd::mutex, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

{
    bool        found;
    int         i;

    WalSndCtl = (WalSndCtlData *)
        ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);

    if (!found)
    {
        /* First time through, so initialize */
        MemSet(WalSndCtl, 0, WalSndShmemSize());

        for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
            SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));

        for (i = 0; i < max_wal_senders; i++)
        {
            WalSnd     *walsnd = &WalSndCtl->walsnds[i];

            SpinLockInit(&walsnd->mutex);
            InitSharedLatch(&walsnd->latch);
        }
    }
}

Size WalSndShmemSize ( void   ) 

Definition at line 1741 of file walsender.c.

References add_size(), max_wal_senders, mul_size(), and offsetof.

Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().

{
    Size        size = 0;

    size = offsetof(WalSndCtlData, walsnds);
    size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));

    return size;
}

void WalSndSignals ( void   ) 

Definition at line 1717 of file walsender.c.

References die(), InitializeTimeouts(), pqsignal(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalSndLastCycleHandler(), WalSndSigHupHandler(), and WalSndXLogSendHandler().

Referenced by PostgresMain().

{
    /* Set up signal handlers */
    pqsignal(SIGHUP, WalSndSigHupHandler);      /* set flag to read config
                                                 * file */
    pqsignal(SIGINT, SIG_IGN);  /* not used */
    pqsignal(SIGTERM, die);                     /* request shutdown */
    pqsignal(SIGQUIT, quickdie);                /* hard crash time */
    InitializeTimeouts();       /* establishes SIGALRM handler */
    pqsignal(SIGPIPE, SIG_IGN);
    pqsignal(SIGUSR1, WalSndXLogSendHandler);   /* request WAL sending */
    pqsignal(SIGUSR2, WalSndLastCycleHandler);  /* request a last cycle and
                                                 * shutdown */

    /* Reset some signals that are accepted by postmaster but not here */
    pqsignal(SIGCHLD, SIG_DFL);
    pqsignal(SIGTTIN, SIG_DFL);
    pqsignal(SIGTTOU, SIG_DFL);
    pqsignal(SIGCONT, SIG_DFL);
    pqsignal(SIGWINCH, SIG_DFL);
}

void WalSndWakeup ( void   ) 

Definition at line 1786 of file walsender.c.

References i, WalSnd::latch, max_wal_senders, SetLatch(), and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().

{
    int         i;

    for (i = 0; i < max_wal_senders; i++)
        SetLatch(&WalSndCtl->walsnds[i].latch);
}


Variable Documentation

Definition at line 102 of file walsender.c.

Definition at line 97 of file walsender.c.

Referenced by WalSndLoop().