#include "postgres.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <unistd.h>
#include <signal.h>
#include "access/xlog_internal.h"
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
#include "utils/timestamp.h"
Go to the source code of this file.
Defines | |
#define | WALRCV_STARTUP_TIMEOUT 10 |
Functions | |
Size | WalRcvShmemSize (void) |
void | WalRcvShmemInit (void) |
bool | WalRcvRunning (void) |
bool | WalRcvStreaming (void) |
void | ShutdownWalRcv (void) |
void | RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo) |
XLogRecPtr | GetWalRcvWriteRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) |
int | GetReplicationApplyDelay (void) |
int | GetReplicationTransferLatency (void) |
Variables | |
WalRcvData * | WalRcv = NULL |
#define WALRCV_STARTUP_TIMEOUT 10 |
Definition at line 39 of file walreceiverfuncs.c.
int GetReplicationApplyDelay | ( | void | ) |
Definition at line 312 of file walreceiverfuncs.c.
References GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, NULL, WalRcvData::receivedUpto, SpinLockAcquire, SpinLockRelease, and TimestampDifference().
Referenced by ProcessWalSndrMessage().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; XLogRecPtr receivePtr; XLogRecPtr replayPtr; long secs; int usecs; SpinLockAcquire(&walrcv->mutex); receivePtr = walrcv->receivedUpto; SpinLockRelease(&walrcv->mutex); replayPtr = GetXLogReplayRecPtr(NULL); if (receivePtr == replayPtr) return 0; TimestampDifference(GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), &secs, &usecs); return (((int) secs * 1000) + (usecs / 1000)); }
int GetReplicationTransferLatency | ( | void | ) |
Definition at line 344 of file walreceiverfuncs.c.
References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, and TimestampDifference().
Referenced by ProcessWalSndrMessage().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; TimestampTz lastMsgSendTime; TimestampTz lastMsgReceiptTime; long secs = 0; int usecs = 0; int ms; SpinLockAcquire(&walrcv->mutex); lastMsgSendTime = walrcv->lastMsgSendTime; lastMsgReceiptTime = walrcv->lastMsgReceiptTime; SpinLockRelease(&walrcv->mutex); TimestampDifference(lastMsgSendTime, lastMsgReceiptTime, &secs, &usecs); ms = ((int) secs * 1000) + (usecs / 1000); return ms; }
XLogRecPtr GetWalRcvWriteRecPtr | ( | XLogRecPtr * | latestChunkStart, | |
TimeLineID * | receiveTLI | |||
) |
Definition at line 291 of file walreceiverfuncs.c.
References WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, SpinLockAcquire, and SpinLockRelease.
Referenced by CreateRestartPoint(), GetStandbyFlushRecPtr(), pg_last_xlog_receive_location(), and WaitForWALToBecomeAvailable().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; XLogRecPtr recptr; SpinLockAcquire(&walrcv->mutex); recptr = walrcv->receivedUpto; if (latestChunkStart) *latestChunkStart = walrcv->latestChunkStart; if (receiveTLI) *receiveTLI = walrcv->receivedTLI; SpinLockRelease(&walrcv->mutex); return recptr; }
void RequestXLogStreaming | ( | TimeLineID | tli, | |
XLogRecPtr | recptr, | |||
const char * | conninfo | |||
) |
Definition at line 226 of file walreceiverfuncs.c.
References Assert, WalRcvData::conninfo, WalRcvData::latch, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, now(), NULL, PMSIGNAL_START_WALRECEIVER, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegSize.
Referenced by WaitForWALToBecomeAvailable().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; bool launch = false; pg_time_t now = (pg_time_t) time(NULL); /* * We always start at the beginning of the segment. That prevents a broken * segment (i.e., with no records in the first half of a segment) from * being created by XLOG streaming, which might cause trouble later on if * the segment is e.g archived. */ if (recptr % XLogSegSize != 0) recptr -= recptr % XLogSegSize; SpinLockAcquire(&walrcv->mutex); /* It better be stopped if we try to restart it */ Assert(walrcv->walRcvState == WALRCV_STOPPED || walrcv->walRcvState == WALRCV_WAITING); if (conninfo != NULL) strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else walrcv->conninfo[0] = '\0'; if (walrcv->walRcvState == WALRCV_STOPPED) { launch = true; walrcv->walRcvState = WALRCV_STARTING; } else walrcv->walRcvState = WALRCV_RESTARTING; walrcv->startTime = now; /* * If this is the first startup of walreceiver, we initialize receivedUpto * and latestChunkStart to receiveStart. */ if (walrcv->receiveStart == 0) { walrcv->receivedUpto = recptr; walrcv->latestChunkStart = recptr; } walrcv->receiveStart = recptr; walrcv->receiveStartTLI = tli; SpinLockRelease(&walrcv->mutex); if (launch) SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); else SetLatch(&walrcv->latch); }
void ShutdownWalRcv | ( | void | ) |
Definition at line 166 of file walreceiverfuncs.c.
References HandleStartupProcInterrupts(), WalRcvData::mutex, pg_usleep(), WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), and WalRcvData::walRcvState.
Referenced by StartupXLOG(), and WaitForWALToBecomeAvailable().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; pid_t walrcvpid = 0; /* * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED * mode once it's finished, and will also request postmaster to not * restart itself. */ SpinLockAcquire(&walrcv->mutex); switch (walrcv->walRcvState) { case WALRCV_STOPPED: break; case WALRCV_STARTING: walrcv->walRcvState = WALRCV_STOPPED; break; case WALRCV_STREAMING: case WALRCV_WAITING: case WALRCV_RESTARTING: walrcv->walRcvState = WALRCV_STOPPING; /* fall through */ case WALRCV_STOPPING: walrcvpid = walrcv->pid; break; } SpinLockRelease(&walrcv->mutex); /* * Signal walreceiver process if it was still running. */ if (walrcvpid != 0) kill(walrcvpid, SIGTERM); /* * Wait for walreceiver to acknowledge its death by setting state to * WALRCV_STOPPED. */ while (WalRcvRunning()) { /* * This possibly-long loop needs to handle interrupts of startup * process. */ HandleStartupProcInterrupts(); pg_usleep(100000); /* 100ms */ } }
bool WalRcvRunning | ( | void | ) |
Definition at line 73 of file walreceiverfuncs.c.
References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WALRCV_STARTING, WALRCV_STOPPED, and WalRcvData::walRcvState.
Referenced by ShutdownWalRcv().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; WalRcvState state; pg_time_t startTime; SpinLockAcquire(&walrcv->mutex); state = walrcv->walRcvState; startTime = walrcv->startTime; SpinLockRelease(&walrcv->mutex); /* * If it has taken too long for walreceiver to start up, give up. Setting * the state to STOPPED ensures that if walreceiver later does start up * after all, it will see that it's not supposed to be running and die * without doing anything. */ if (state == WALRCV_STARTING) { pg_time_t now = (pg_time_t) time(NULL); if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { SpinLockAcquire(&walrcv->mutex); if (walrcv->walRcvState == WALRCV_STARTING) state = walrcv->walRcvState = WALRCV_STOPPED; SpinLockRelease(&walrcv->mutex); } } if (state != WALRCV_STOPPED) return true; else return false; }
void WalRcvShmemInit | ( | void | ) |
Definition at line 54 of file walreceiverfuncs.c.
References InitSharedLatch(), WalRcvData::latch, MemSet, WalRcvData::mutex, ShmemInitStruct(), SpinLockInit, WalRcvShmemSize(), and WalRcvData::walRcvState.
Referenced by CreateSharedMemoryAndSemaphores().
{ bool found; WalRcv = (WalRcvData *) ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found); if (!found) { /* First time through, so initialize */ MemSet(WalRcv, 0, WalRcvShmemSize()); WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); InitSharedLatch(&WalRcv->latch); } }
Size WalRcvShmemSize | ( | void | ) |
Definition at line 43 of file walreceiverfuncs.c.
References add_size().
Referenced by CreateSharedMemoryAndSemaphores(), and WalRcvShmemInit().
{ Size size = 0; size = add_size(size, sizeof(WalRcvData)); return size; }
bool WalRcvStreaming | ( | void | ) |
Definition at line 119 of file walreceiverfuncs.c.
References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STREAMING, and WalRcvData::walRcvState.
Referenced by WaitForWALToBecomeAvailable().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; WalRcvState state; pg_time_t startTime; SpinLockAcquire(&walrcv->mutex); state = walrcv->walRcvState; startTime = walrcv->startTime; SpinLockRelease(&walrcv->mutex); /* * If it has taken too long for walreceiver to start up, give up. Setting * the state to STOPPED ensures that if walreceiver later does start up * after all, it will see that it's not supposed to be running and die * without doing anything. */ if (state == WALRCV_STARTING) { pg_time_t now = (pg_time_t) time(NULL); if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { SpinLockAcquire(&walrcv->mutex); if (walrcv->walRcvState == WALRCV_STARTING) state = walrcv->walRcvState = WALRCV_STOPPED; SpinLockRelease(&walrcv->mutex); } } if (state == WALRCV_STREAMING || state == WALRCV_STARTING || state == WALRCV_RESTARTING) return true; else return false; }
WalRcvData* WalRcv = NULL |
Definition at line 33 of file walreceiverfuncs.c.
Referenced by ProcessWalSndrMessage(), WalRcvDie(), WalRcvShutdownHandler(), WalRcvWaitForStartPosition(), WalReceiverMain(), and XLogWalRcvFlush().