Header And Logo

PostgreSQL
| The world's most advanced open source database.

Defines | Functions | Variables

walreceiverfuncs.c File Reference

#include "postgres.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>
#include <signal.h>
#include "access/xlog_internal.h"
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
#include "utils/timestamp.h"
Include dependency graph for walreceiverfuncs.c:

Go to the source code of this file.

Defines

#define WALRCV_STARTUP_TIMEOUT   10

Functions

Size WalRcvShmemSize (void)
void WalRcvShmemInit (void)
bool WalRcvRunning (void)
bool WalRcvStreaming (void)
void ShutdownWalRcv (void)
void RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
XLogRecPtr GetWalRcvWriteRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
int GetReplicationApplyDelay (void)
int GetReplicationTransferLatency (void)

Variables

WalRcvDataWalRcv = NULL

Define Documentation

#define WALRCV_STARTUP_TIMEOUT   10

Definition at line 39 of file walreceiverfuncs.c.


Function Documentation

int GetReplicationApplyDelay ( void   ) 

Definition at line 312 of file walreceiverfuncs.c.

References GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, NULL, WalRcvData::receivedUpto, SpinLockAcquire, SpinLockRelease, and TimestampDifference().

Referenced by ProcessWalSndrMessage().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;

    XLogRecPtr  receivePtr;
    XLogRecPtr  replayPtr;

    long        secs;
    int         usecs;

    SpinLockAcquire(&walrcv->mutex);
    receivePtr = walrcv->receivedUpto;
    SpinLockRelease(&walrcv->mutex);

    replayPtr = GetXLogReplayRecPtr(NULL);

    if (receivePtr == replayPtr)
        return 0;

    TimestampDifference(GetCurrentChunkReplayStartTime(),
                        GetCurrentTimestamp(),
                        &secs, &usecs);

    return (((int) secs * 1000) + (usecs / 1000));
}

int GetReplicationTransferLatency ( void   ) 

Definition at line 344 of file walreceiverfuncs.c.

References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, and TimestampDifference().

Referenced by ProcessWalSndrMessage().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;

    TimestampTz lastMsgSendTime;
    TimestampTz lastMsgReceiptTime;

    long        secs = 0;
    int         usecs = 0;
    int         ms;

    SpinLockAcquire(&walrcv->mutex);
    lastMsgSendTime = walrcv->lastMsgSendTime;
    lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
    SpinLockRelease(&walrcv->mutex);

    TimestampDifference(lastMsgSendTime,
                        lastMsgReceiptTime,
                        &secs, &usecs);

    ms = ((int) secs * 1000) + (usecs / 1000);

    return ms;
}

XLogRecPtr GetWalRcvWriteRecPtr ( XLogRecPtr latestChunkStart,
TimeLineID receiveTLI 
)

Definition at line 291 of file walreceiverfuncs.c.

References WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, SpinLockAcquire, and SpinLockRelease.

Referenced by CreateRestartPoint(), GetStandbyFlushRecPtr(), pg_last_xlog_receive_location(), and WaitForWALToBecomeAvailable().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    XLogRecPtr  recptr;

    SpinLockAcquire(&walrcv->mutex);
    recptr = walrcv->receivedUpto;
    if (latestChunkStart)
        *latestChunkStart = walrcv->latestChunkStart;
    if (receiveTLI)
        *receiveTLI = walrcv->receivedTLI;
    SpinLockRelease(&walrcv->mutex);

    return recptr;
}

void RequestXLogStreaming ( TimeLineID  tli,
XLogRecPtr  recptr,
const char *  conninfo 
)

Definition at line 226 of file walreceiverfuncs.c.

References Assert, WalRcvData::conninfo, WalRcvData::latch, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, now(), NULL, PMSIGNAL_START_WALRECEIVER, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegSize.

Referenced by WaitForWALToBecomeAvailable().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    bool        launch = false;
    pg_time_t   now = (pg_time_t) time(NULL);

    /*
     * We always start at the beginning of the segment. That prevents a broken
     * segment (i.e., with no records in the first half of a segment) from
     * being created by XLOG streaming, which might cause trouble later on if
     * the segment is e.g archived.
     */
    if (recptr % XLogSegSize != 0)
        recptr -= recptr % XLogSegSize;

    SpinLockAcquire(&walrcv->mutex);

    /* It better be stopped if we try to restart it */
    Assert(walrcv->walRcvState == WALRCV_STOPPED ||
           walrcv->walRcvState == WALRCV_WAITING);

    if (conninfo != NULL)
        strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
    else
        walrcv->conninfo[0] = '\0';

    if (walrcv->walRcvState == WALRCV_STOPPED)
    {
        launch = true;
        walrcv->walRcvState = WALRCV_STARTING;
    }
    else
        walrcv->walRcvState = WALRCV_RESTARTING;
    walrcv->startTime = now;

    /*
     * If this is the first startup of walreceiver, we initialize receivedUpto
     * and latestChunkStart to receiveStart.
     */
    if (walrcv->receiveStart == 0)
    {
        walrcv->receivedUpto = recptr;
        walrcv->latestChunkStart = recptr;
    }
    walrcv->receiveStart = recptr;
    walrcv->receiveStartTLI = tli;

    SpinLockRelease(&walrcv->mutex);

    if (launch)
        SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
    else
        SetLatch(&walrcv->latch);
}

void ShutdownWalRcv ( void   ) 

Definition at line 166 of file walreceiverfuncs.c.

References HandleStartupProcInterrupts(), WalRcvData::mutex, pg_usleep(), WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), and WalRcvData::walRcvState.

Referenced by StartupXLOG(), and WaitForWALToBecomeAvailable().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    pid_t       walrcvpid = 0;

    /*
     * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     * mode once it's finished, and will also request postmaster to not
     * restart itself.
     */
    SpinLockAcquire(&walrcv->mutex);
    switch (walrcv->walRcvState)
    {
        case WALRCV_STOPPED:
            break;
        case WALRCV_STARTING:
            walrcv->walRcvState = WALRCV_STOPPED;
            break;

        case WALRCV_STREAMING:
        case WALRCV_WAITING:
        case WALRCV_RESTARTING:
            walrcv->walRcvState = WALRCV_STOPPING;
            /* fall through */
        case WALRCV_STOPPING:
            walrcvpid = walrcv->pid;
            break;
    }
    SpinLockRelease(&walrcv->mutex);

    /*
     * Signal walreceiver process if it was still running.
     */
    if (walrcvpid != 0)
        kill(walrcvpid, SIGTERM);

    /*
     * Wait for walreceiver to acknowledge its death by setting state to
     * WALRCV_STOPPED.
     */
    while (WalRcvRunning())
    {
        /*
         * This possibly-long loop needs to handle interrupts of startup
         * process.
         */
        HandleStartupProcInterrupts();

        pg_usleep(100000);      /* 100ms */
    }
}

bool WalRcvRunning ( void   ) 

Definition at line 73 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WALRCV_STARTING, WALRCV_STOPPED, and WalRcvData::walRcvState.

Referenced by ShutdownWalRcv().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    WalRcvState state;
    pg_time_t   startTime;

    SpinLockAcquire(&walrcv->mutex);

    state = walrcv->walRcvState;
    startTime = walrcv->startTime;

    SpinLockRelease(&walrcv->mutex);

    /*
     * If it has taken too long for walreceiver to start up, give up. Setting
     * the state to STOPPED ensures that if walreceiver later does start up
     * after all, it will see that it's not supposed to be running and die
     * without doing anything.
     */
    if (state == WALRCV_STARTING)
    {
        pg_time_t   now = (pg_time_t) time(NULL);

        if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
        {
            SpinLockAcquire(&walrcv->mutex);

            if (walrcv->walRcvState == WALRCV_STARTING)
                state = walrcv->walRcvState = WALRCV_STOPPED;

            SpinLockRelease(&walrcv->mutex);
        }
    }

    if (state != WALRCV_STOPPED)
        return true;
    else
        return false;
}

void WalRcvShmemInit ( void   ) 

Definition at line 54 of file walreceiverfuncs.c.

References InitSharedLatch(), WalRcvData::latch, MemSet, WalRcvData::mutex, ShmemInitStruct(), SpinLockInit, WalRcvShmemSize(), and WalRcvData::walRcvState.

Referenced by CreateSharedMemoryAndSemaphores().

{
    bool        found;

    WalRcv = (WalRcvData *)
        ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);

    if (!found)
    {
        /* First time through, so initialize */
        MemSet(WalRcv, 0, WalRcvShmemSize());
        WalRcv->walRcvState = WALRCV_STOPPED;
        SpinLockInit(&WalRcv->mutex);
        InitSharedLatch(&WalRcv->latch);
    }
}

Size WalRcvShmemSize ( void   ) 

Definition at line 43 of file walreceiverfuncs.c.

References add_size().

Referenced by CreateSharedMemoryAndSemaphores(), and WalRcvShmemInit().

{
    Size        size = 0;

    size = add_size(size, sizeof(WalRcvData));

    return size;
}

bool WalRcvStreaming ( void   ) 

Definition at line 119 of file walreceiverfuncs.c.

References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STREAMING, and WalRcvData::walRcvState.

Referenced by WaitForWALToBecomeAvailable().

{
    /* use volatile pointer to prevent code rearrangement */
    volatile WalRcvData *walrcv = WalRcv;
    WalRcvState state;
    pg_time_t   startTime;

    SpinLockAcquire(&walrcv->mutex);

    state = walrcv->walRcvState;
    startTime = walrcv->startTime;

    SpinLockRelease(&walrcv->mutex);

    /*
     * If it has taken too long for walreceiver to start up, give up. Setting
     * the state to STOPPED ensures that if walreceiver later does start up
     * after all, it will see that it's not supposed to be running and die
     * without doing anything.
     */
    if (state == WALRCV_STARTING)
    {
        pg_time_t   now = (pg_time_t) time(NULL);

        if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
        {
            SpinLockAcquire(&walrcv->mutex);

            if (walrcv->walRcvState == WALRCV_STARTING)
                state = walrcv->walRcvState = WALRCV_STOPPED;

            SpinLockRelease(&walrcv->mutex);
        }
    }

    if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
        state == WALRCV_RESTARTING)
        return true;
    else
        return false;
}


Variable Documentation

WalRcvData* WalRcv = NULL