#include "postgres.h"#include <sys/types.h>#include <sys/stat.h>#include <sys/time.h>#include <unistd.h>#include <signal.h>#include "access/xlog_internal.h"#include "postmaster/startup.h"#include "replication/walreceiver.h"#include "storage/pmsignal.h"#include "storage/shmem.h"#include "utils/timestamp.h"
Go to the source code of this file.
Defines | |
| #define | WALRCV_STARTUP_TIMEOUT 10 |
Functions | |
| Size | WalRcvShmemSize (void) |
| void | WalRcvShmemInit (void) |
| bool | WalRcvRunning (void) |
| bool | WalRcvStreaming (void) |
| void | ShutdownWalRcv (void) |
| void | RequestXLogStreaming (TimeLineID tli, XLogRecPtr recptr, const char *conninfo) |
| XLogRecPtr | GetWalRcvWriteRecPtr (XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) |
| int | GetReplicationApplyDelay (void) |
| int | GetReplicationTransferLatency (void) |
Variables | |
| WalRcvData * | WalRcv = NULL |
| #define WALRCV_STARTUP_TIMEOUT 10 |
Definition at line 39 of file walreceiverfuncs.c.
| int GetReplicationApplyDelay | ( | void | ) |
Definition at line 312 of file walreceiverfuncs.c.
References GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, NULL, WalRcvData::receivedUpto, SpinLockAcquire, SpinLockRelease, and TimestampDifference().
Referenced by ProcessWalSndrMessage().
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
XLogRecPtr receivePtr;
XLogRecPtr replayPtr;
long secs;
int usecs;
SpinLockAcquire(&walrcv->mutex);
receivePtr = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
replayPtr = GetXLogReplayRecPtr(NULL);
if (receivePtr == replayPtr)
return 0;
TimestampDifference(GetCurrentChunkReplayStartTime(),
GetCurrentTimestamp(),
&secs, &usecs);
return (((int) secs * 1000) + (usecs / 1000));
}
| int GetReplicationTransferLatency | ( | void | ) |
Definition at line 344 of file walreceiverfuncs.c.
References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, and TimestampDifference().
Referenced by ProcessWalSndrMessage().
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
TimestampTz lastMsgSendTime;
TimestampTz lastMsgReceiptTime;
long secs = 0;
int usecs = 0;
int ms;
SpinLockAcquire(&walrcv->mutex);
lastMsgSendTime = walrcv->lastMsgSendTime;
lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
SpinLockRelease(&walrcv->mutex);
TimestampDifference(lastMsgSendTime,
lastMsgReceiptTime,
&secs, &usecs);
ms = ((int) secs * 1000) + (usecs / 1000);
return ms;
}
| XLogRecPtr GetWalRcvWriteRecPtr | ( | XLogRecPtr * | latestChunkStart, | |
| TimeLineID * | receiveTLI | |||
| ) |
Definition at line 291 of file walreceiverfuncs.c.
References WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, SpinLockAcquire, and SpinLockRelease.
Referenced by CreateRestartPoint(), GetStandbyFlushRecPtr(), pg_last_xlog_receive_location(), and WaitForWALToBecomeAvailable().
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
XLogRecPtr recptr;
SpinLockAcquire(&walrcv->mutex);
recptr = walrcv->receivedUpto;
if (latestChunkStart)
*latestChunkStart = walrcv->latestChunkStart;
if (receiveTLI)
*receiveTLI = walrcv->receivedTLI;
SpinLockRelease(&walrcv->mutex);
return recptr;
}
| void RequestXLogStreaming | ( | TimeLineID | tli, | |
| XLogRecPtr | recptr, | |||
| const char * | conninfo | |||
| ) |
Definition at line 226 of file walreceiverfuncs.c.
References Assert, WalRcvData::conninfo, WalRcvData::latch, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, now(), NULL, PMSIGNAL_START_WALRECEIVER, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegSize.
Referenced by WaitForWALToBecomeAvailable().
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
bool launch = false;
pg_time_t now = (pg_time_t) time(NULL);
/*
* We always start at the beginning of the segment. That prevents a broken
* segment (i.e., with no records in the first half of a segment) from
* being created by XLOG streaming, which might cause trouble later on if
* the segment is e.g archived.
*/
if (recptr % XLogSegSize != 0)
recptr -= recptr % XLogSegSize;
SpinLockAcquire(&walrcv->mutex);
/* It better be stopped if we try to restart it */
Assert(walrcv->walRcvState == WALRCV_STOPPED ||
walrcv->walRcvState == WALRCV_WAITING);
if (conninfo != NULL)
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else
walrcv->conninfo[0] = '\0';
if (walrcv->walRcvState == WALRCV_STOPPED)
{
launch = true;
walrcv->walRcvState = WALRCV_STARTING;
}
else
walrcv->walRcvState = WALRCV_RESTARTING;
walrcv->startTime = now;
/*
* If this is the first startup of walreceiver, we initialize receivedUpto
* and latestChunkStart to receiveStart.
*/
if (walrcv->receiveStart == 0)
{
walrcv->receivedUpto = recptr;
walrcv->latestChunkStart = recptr;
}
walrcv->receiveStart = recptr;
walrcv->receiveStartTLI = tli;
SpinLockRelease(&walrcv->mutex);
if (launch)
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
else
SetLatch(&walrcv->latch);
}
| void ShutdownWalRcv | ( | void | ) |
Definition at line 166 of file walreceiverfuncs.c.
References HandleStartupProcInterrupts(), WalRcvData::mutex, pg_usleep(), WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), and WalRcvData::walRcvState.
Referenced by StartupXLOG(), and WaitForWALToBecomeAvailable().
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
pid_t walrcvpid = 0;
/*
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
* mode once it's finished, and will also request postmaster to not
* restart itself.
*/
SpinLockAcquire(&walrcv->mutex);
switch (walrcv->walRcvState)
{
case WALRCV_STOPPED:
break;
case WALRCV_STARTING:
walrcv->walRcvState = WALRCV_STOPPED;
break;
case WALRCV_STREAMING:
case WALRCV_WAITING:
case WALRCV_RESTARTING:
walrcv->walRcvState = WALRCV_STOPPING;
/* fall through */
case WALRCV_STOPPING:
walrcvpid = walrcv->pid;
break;
}
SpinLockRelease(&walrcv->mutex);
/*
* Signal walreceiver process if it was still running.
*/
if (walrcvpid != 0)
kill(walrcvpid, SIGTERM);
/*
* Wait for walreceiver to acknowledge its death by setting state to
* WALRCV_STOPPED.
*/
while (WalRcvRunning())
{
/*
* This possibly-long loop needs to handle interrupts of startup
* process.
*/
HandleStartupProcInterrupts();
pg_usleep(100000); /* 100ms */
}
}
| bool WalRcvRunning | ( | void | ) |
Definition at line 73 of file walreceiverfuncs.c.
References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WALRCV_STARTING, WALRCV_STOPPED, and WalRcvData::walRcvState.
Referenced by ShutdownWalRcv().
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
WalRcvState state;
pg_time_t startTime;
SpinLockAcquire(&walrcv->mutex);
state = walrcv->walRcvState;
startTime = walrcv->startTime;
SpinLockRelease(&walrcv->mutex);
/*
* If it has taken too long for walreceiver to start up, give up. Setting
* the state to STOPPED ensures that if walreceiver later does start up
* after all, it will see that it's not supposed to be running and die
* without doing anything.
*/
if (state == WALRCV_STARTING)
{
pg_time_t now = (pg_time_t) time(NULL);
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{
SpinLockAcquire(&walrcv->mutex);
if (walrcv->walRcvState == WALRCV_STARTING)
state = walrcv->walRcvState = WALRCV_STOPPED;
SpinLockRelease(&walrcv->mutex);
}
}
if (state != WALRCV_STOPPED)
return true;
else
return false;
}
| void WalRcvShmemInit | ( | void | ) |
Definition at line 54 of file walreceiverfuncs.c.
References InitSharedLatch(), WalRcvData::latch, MemSet, WalRcvData::mutex, ShmemInitStruct(), SpinLockInit, WalRcvShmemSize(), and WalRcvData::walRcvState.
Referenced by CreateSharedMemoryAndSemaphores().
{
bool found;
WalRcv = (WalRcvData *)
ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
if (!found)
{
/* First time through, so initialize */
MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex);
InitSharedLatch(&WalRcv->latch);
}
}
| Size WalRcvShmemSize | ( | void | ) |
Definition at line 43 of file walreceiverfuncs.c.
References add_size().
Referenced by CreateSharedMemoryAndSemaphores(), and WalRcvShmemInit().
{
Size size = 0;
size = add_size(size, sizeof(WalRcvData));
return size;
}
| bool WalRcvStreaming | ( | void | ) |
Definition at line 119 of file walreceiverfuncs.c.
References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STREAMING, and WalRcvData::walRcvState.
Referenced by WaitForWALToBecomeAvailable().
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
WalRcvState state;
pg_time_t startTime;
SpinLockAcquire(&walrcv->mutex);
state = walrcv->walRcvState;
startTime = walrcv->startTime;
SpinLockRelease(&walrcv->mutex);
/*
* If it has taken too long for walreceiver to start up, give up. Setting
* the state to STOPPED ensures that if walreceiver later does start up
* after all, it will see that it's not supposed to be running and die
* without doing anything.
*/
if (state == WALRCV_STARTING)
{
pg_time_t now = (pg_time_t) time(NULL);
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{
SpinLockAcquire(&walrcv->mutex);
if (walrcv->walRcvState == WALRCV_STARTING)
state = walrcv->walRcvState = WALRCV_STOPPED;
SpinLockRelease(&walrcv->mutex);
}
}
if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
state == WALRCV_RESTARTING)
return true;
else
return false;
}
| WalRcvData* WalRcv = NULL |
Definition at line 33 of file walreceiverfuncs.c.
Referenced by ProcessWalSndrMessage(), WalRcvDie(), WalRcvShutdownHandler(), WalRcvWaitForStartPosition(), WalReceiverMain(), and XLogWalRcvFlush().
1.7.1