#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "pgtime.h"
Go to the source code of this file.
#define AllowCascadeReplication | ( | ) | (EnableHotStandby && max_wal_senders > 0) |
Definition at line 34 of file walreceiver.h.
Referenced by StartupXLOG(), and XLogWalRcvFlush().
#define MAXCONNINFO 1024 |
Definition at line 31 of file walreceiver.h.
Referenced by libpqrcv_connect(), RequestXLogStreaming(), and WalReceiverMain().
typedef void(* walrcv_connect_type)(char *conninfo) |
Definition at line 119 of file walreceiver.h.
typedef void(* walrcv_disconnect_type)(void) |
Definition at line 140 of file walreceiver.h.
typedef void(* walrcv_endstreaming_type)(TimeLineID *next_tli) |
Definition at line 131 of file walreceiver.h.
typedef void(* walrcv_identify_system_type)(TimeLineID *primary_tli) |
Definition at line 122 of file walreceiver.h.
typedef void(* walrcv_readtimelinehistoryfile_type)(TimeLineID tli, char **filename, char **content, int *size) |
Definition at line 125 of file walreceiver.h.
typedef int(* walrcv_receive_type)(int timeout, char **buffer) |
Definition at line 134 of file walreceiver.h.
typedef void(* walrcv_send_type)(const char *buffer, int nbytes) |
Definition at line 137 of file walreceiver.h.
typedef bool(* walrcv_startstreaming_type)(TimeLineID tli, XLogRecPtr startpoint) |
Definition at line 128 of file walreceiver.h.
enum WalRcvState |
WALRCV_STOPPED | |
WALRCV_STARTING | |
WALRCV_STREAMING | |
WALRCV_WAITING | |
WALRCV_RESTARTING | |
WALRCV_STOPPING |
Definition at line 39 of file walreceiver.h.
{ WALRCV_STOPPED, /* stopped and mustn't start up again */ WALRCV_STARTING, /* launched, but the process hasn't * initialized yet */ WALRCV_STREAMING, /* walreceiver is streaming */ WALRCV_WAITING, /* stopped streaming, waiting for orders */ WALRCV_RESTARTING, /* asked to restart streaming */ WALRCV_STOPPING /* requested to stop, but still running */ } WalRcvState;
int GetReplicationApplyDelay | ( | void | ) |
Definition at line 312 of file walreceiverfuncs.c.
References GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), GetXLogReplayRecPtr(), WalRcvData::mutex, NULL, WalRcvData::receivedUpto, SpinLockAcquire, SpinLockRelease, and TimestampDifference().
Referenced by ProcessWalSndrMessage().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; XLogRecPtr receivePtr; XLogRecPtr replayPtr; long secs; int usecs; SpinLockAcquire(&walrcv->mutex); receivePtr = walrcv->receivedUpto; SpinLockRelease(&walrcv->mutex); replayPtr = GetXLogReplayRecPtr(NULL); if (receivePtr == replayPtr) return 0; TimestampDifference(GetCurrentChunkReplayStartTime(), GetCurrentTimestamp(), &secs, &usecs); return (((int) secs * 1000) + (usecs / 1000)); }
int GetReplicationTransferLatency | ( | void | ) |
Definition at line 344 of file walreceiverfuncs.c.
References WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::mutex, SpinLockAcquire, SpinLockRelease, and TimestampDifference().
Referenced by ProcessWalSndrMessage().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; TimestampTz lastMsgSendTime; TimestampTz lastMsgReceiptTime; long secs = 0; int usecs = 0; int ms; SpinLockAcquire(&walrcv->mutex); lastMsgSendTime = walrcv->lastMsgSendTime; lastMsgReceiptTime = walrcv->lastMsgReceiptTime; SpinLockRelease(&walrcv->mutex); TimestampDifference(lastMsgSendTime, lastMsgReceiptTime, &secs, &usecs); ms = ((int) secs * 1000) + (usecs / 1000); return ms; }
XLogRecPtr GetWalRcvWriteRecPtr | ( | XLogRecPtr * | latestChunkStart, | |
TimeLineID * | receiveTLI | |||
) |
Definition at line 291 of file walreceiverfuncs.c.
References WalRcvData::latestChunkStart, WalRcvData::mutex, WalRcvData::receivedTLI, WalRcvData::receivedUpto, SpinLockAcquire, and SpinLockRelease.
Referenced by CreateRestartPoint(), GetStandbyFlushRecPtr(), pg_last_xlog_receive_location(), and WaitForWALToBecomeAvailable().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; XLogRecPtr recptr; SpinLockAcquire(&walrcv->mutex); recptr = walrcv->receivedUpto; if (latestChunkStart) *latestChunkStart = walrcv->latestChunkStart; if (receiveTLI) *receiveTLI = walrcv->receivedTLI; SpinLockRelease(&walrcv->mutex); return recptr; }
void RequestXLogStreaming | ( | TimeLineID | tli, | |
XLogRecPtr | recptr, | |||
const char * | conninfo | |||
) |
Definition at line 226 of file walreceiverfuncs.c.
References Assert, WalRcvData::conninfo, WalRcvData::latch, WalRcvData::latestChunkStart, MAXCONNINFO, WalRcvData::mutex, now(), NULL, PMSIGNAL_START_WALRECEIVER, WalRcvData::receivedUpto, WalRcvData::receiveStart, WalRcvData::receiveStartTLI, SendPostmasterSignal(), SetLatch(), SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, strlcpy(), WALRCV_STOPPED, WALRCV_WAITING, WalRcvData::walRcvState, and XLogSegSize.
Referenced by WaitForWALToBecomeAvailable().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; bool launch = false; pg_time_t now = (pg_time_t) time(NULL); /* * We always start at the beginning of the segment. That prevents a broken * segment (i.e., with no records in the first half of a segment) from * being created by XLOG streaming, which might cause trouble later on if * the segment is e.g archived. */ if (recptr % XLogSegSize != 0) recptr -= recptr % XLogSegSize; SpinLockAcquire(&walrcv->mutex); /* It better be stopped if we try to restart it */ Assert(walrcv->walRcvState == WALRCV_STOPPED || walrcv->walRcvState == WALRCV_WAITING); if (conninfo != NULL) strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else walrcv->conninfo[0] = '\0'; if (walrcv->walRcvState == WALRCV_STOPPED) { launch = true; walrcv->walRcvState = WALRCV_STARTING; } else walrcv->walRcvState = WALRCV_RESTARTING; walrcv->startTime = now; /* * If this is the first startup of walreceiver, we initialize receivedUpto * and latestChunkStart to receiveStart. */ if (walrcv->receiveStart == 0) { walrcv->receivedUpto = recptr; walrcv->latestChunkStart = recptr; } walrcv->receiveStart = recptr; walrcv->receiveStartTLI = tli; SpinLockRelease(&walrcv->mutex); if (launch) SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); else SetLatch(&walrcv->latch); }
void ShutdownWalRcv | ( | void | ) |
Definition at line 166 of file walreceiverfuncs.c.
References HandleStartupProcInterrupts(), WalRcvData::mutex, pg_usleep(), WalRcvData::pid, SpinLockAcquire, SpinLockRelease, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvRunning(), and WalRcvData::walRcvState.
Referenced by StartupXLOG(), and WaitForWALToBecomeAvailable().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; pid_t walrcvpid = 0; /* * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED * mode once it's finished, and will also request postmaster to not * restart itself. */ SpinLockAcquire(&walrcv->mutex); switch (walrcv->walRcvState) { case WALRCV_STOPPED: break; case WALRCV_STARTING: walrcv->walRcvState = WALRCV_STOPPED; break; case WALRCV_STREAMING: case WALRCV_WAITING: case WALRCV_RESTARTING: walrcv->walRcvState = WALRCV_STOPPING; /* fall through */ case WALRCV_STOPPING: walrcvpid = walrcv->pid; break; } SpinLockRelease(&walrcv->mutex); /* * Signal walreceiver process if it was still running. */ if (walrcvpid != 0) kill(walrcvpid, SIGTERM); /* * Wait for walreceiver to acknowledge its death by setting state to * WALRCV_STOPPED. */ while (WalRcvRunning()) { /* * This possibly-long loop needs to handle interrupts of startup * process. */ HandleStartupProcInterrupts(); pg_usleep(100000); /* 100ms */ } }
bool WalRcvRunning | ( | void | ) |
Definition at line 73 of file walreceiverfuncs.c.
References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WALRCV_STARTING, WALRCV_STOPPED, and WalRcvData::walRcvState.
Referenced by ShutdownWalRcv().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; WalRcvState state; pg_time_t startTime; SpinLockAcquire(&walrcv->mutex); state = walrcv->walRcvState; startTime = walrcv->startTime; SpinLockRelease(&walrcv->mutex); /* * If it has taken too long for walreceiver to start up, give up. Setting * the state to STOPPED ensures that if walreceiver later does start up * after all, it will see that it's not supposed to be running and die * without doing anything. */ if (state == WALRCV_STARTING) { pg_time_t now = (pg_time_t) time(NULL); if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { SpinLockAcquire(&walrcv->mutex); if (walrcv->walRcvState == WALRCV_STARTING) state = walrcv->walRcvState = WALRCV_STOPPED; SpinLockRelease(&walrcv->mutex); } } if (state != WALRCV_STOPPED) return true; else return false; }
void WalRcvShmemInit | ( | void | ) |
Definition at line 54 of file walreceiverfuncs.c.
References InitSharedLatch(), WalRcvData::latch, MemSet, WalRcvData::mutex, ShmemInitStruct(), SpinLockInit, WalRcvShmemSize(), and WalRcvData::walRcvState.
Referenced by CreateSharedMemoryAndSemaphores().
{ bool found; WalRcv = (WalRcvData *) ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found); if (!found) { /* First time through, so initialize */ MemSet(WalRcv, 0, WalRcvShmemSize()); WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); InitSharedLatch(&WalRcv->latch); } }
Size WalRcvShmemSize | ( | void | ) |
Definition at line 43 of file walreceiverfuncs.c.
References add_size().
Referenced by CreateSharedMemoryAndSemaphores(), and WalRcvShmemInit().
{ Size size = 0; size = add_size(size, sizeof(WalRcvData)); return size; }
bool WalRcvStreaming | ( | void | ) |
Definition at line 119 of file walreceiverfuncs.c.
References WalRcvData::mutex, now(), NULL, SpinLockAcquire, SpinLockRelease, WalRcvData::startTime, WALRCV_RESTARTING, WALRCV_STARTING, WALRCV_STREAMING, and WalRcvData::walRcvState.
Referenced by WaitForWALToBecomeAvailable().
{ /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; WalRcvState state; pg_time_t startTime; SpinLockAcquire(&walrcv->mutex); state = walrcv->walRcvState; startTime = walrcv->startTime; SpinLockRelease(&walrcv->mutex); /* * If it has taken too long for walreceiver to start up, give up. Setting * the state to STOPPED ensures that if walreceiver later does start up * after all, it will see that it's not supposed to be running and die * without doing anything. */ if (state == WALRCV_STARTING) { pg_time_t now = (pg_time_t) time(NULL); if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { SpinLockAcquire(&walrcv->mutex); if (walrcv->walRcvState == WALRCV_STARTING) state = walrcv->walRcvState = WALRCV_STOPPED; SpinLockRelease(&walrcv->mutex); } } if (state == WALRCV_STREAMING || state == WALRCV_STARTING || state == WALRCV_RESTARTING) return true; else return false; }
void WalReceiverMain | ( | void | ) |
Definition at line 187 of file walreceiver.c.
References Assert, BlockSig, buf, close, WalRcvData::conninfo, CurrentResourceOwner, DEBUG1, DisableWalRcvImmediateExit(), elog, EnableWalRcvImmediateExit(), ereport, errcode_for_file_access(), errdetail(), errmsg(), ERROR, FATAL, GetCurrentTimestamp(), GetXLogReplayRecPtr(), got_SIGHUP, initStringInfo(), WalRcvData::lastMsgReceiptTime, WalRcvData::lastMsgSendTime, WalRcvData::latch, WalRcvData::latestWalEndTime, load_file(), LOG, LogstreamResult, MAXCONNINFO, WalRcvData::mutex, MyProcPid, NAPTIME_PER_CYCLE, now(), NULL, on_shmem_exit(), OwnLatch(), PANIC, PG_SETMASK, PGC_SIGHUP, WalRcvData::pid, ping_sent, PostmasterIsAlive(), pqsignal(), proc_exit(), ProcessConfigFile(), ProcessWalRcvInterrupts(), WalRcvData::receiveStart, WalRcvData::receiveStartTLI, RecoveryInProgress(), recvFile, recvFileTLI, recvSegNo, ResourceOwnerCreate(), SIG_DFL, SIG_IGN, SIGALRM, SIGCHLD, SIGCONT, sigdelset, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, SpinLockAcquire, SpinLockRelease, strlcpy(), ThisTimeLineID, TimestampTzPlusMilliseconds, UnBlockSig, wal_receiver_timeout, WalRcv, walrcv_connect, walrcv_disconnect, walrcv_endstreaming, walrcv_identify_system, walrcv_readtimelinehistoryfile, walrcv_receive, WALRCV_RESTARTING, walrcv_send, WALRCV_STARTING, walrcv_startstreaming, WALRCV_STOPPED, WALRCV_STOPPING, WALRCV_STREAMING, WALRCV_WAITING, WalRcvDie(), WalRcvFetchTimeLineHistoryFiles(), WalRcvQuickDieHandler(), WalRcvShutdownHandler(), WalRcvSigHupHandler(), WalRcvSigUsr1Handler(), WalRcvData::walRcvState, WalRcvWaitForStartPosition(), XLogArchiveForceDone(), XLogFileName, XLogFileNameP(), XLogWalRcvFlush(), XLogWalRcvProcessMsg(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().
Referenced by AuxiliaryProcessMain().
{ char conninfo[MAXCONNINFO]; XLogRecPtr startpoint; TimeLineID startpointTLI; TimeLineID primaryTLI; bool first_stream; /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; TimestampTz last_recv_timestamp; bool ping_sent; /* * WalRcv should be set up already (if we are a backend, we inherit this * by fork() or EXEC_BACKEND mechanism from the postmaster). */ Assert(walrcv != NULL); /* * Mark walreceiver as running in shared memory. * * Do this as early as possible, so that if we fail later on, we'll set * state to STOPPED. If we die before this, the startup process will keep * waiting for us to start up, until it times out. */ SpinLockAcquire(&walrcv->mutex); Assert(walrcv->pid == 0); switch (walrcv->walRcvState) { case WALRCV_STOPPING: /* If we've already been requested to stop, don't start up. */ walrcv->walRcvState = WALRCV_STOPPED; /* fall through */ case WALRCV_STOPPED: SpinLockRelease(&walrcv->mutex); proc_exit(1); break; case WALRCV_STARTING: /* The usual case */ break; case WALRCV_WAITING: case WALRCV_STREAMING: case WALRCV_RESTARTING: default: /* Shouldn't happen */ elog(PANIC, "walreceiver still running according to shared memory state"); } /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; walrcv->walRcvState = WALRCV_STREAMING; /* Fetch information required to start streaming */ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); startpoint = walrcv->receiveStart; startpointTLI = walrcv->receiveStartTLI; /* Initialise to a sanish value */ walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp(); SpinLockRelease(&walrcv->mutex); /* Arrange to clean up at walreceiver exit */ on_shmem_exit(WalRcvDie, 0); OwnLatch(&walrcv->latch); /* * If possible, make this process a group leader, so that the postmaster * can signal any child processes too. (walreceiver probably never has * any child processes, but for consistency we make all postmaster child * processes do this.) */ #ifdef HAVE_SETSID if (setsid() < 0) elog(FATAL, "setsid() failed: %m"); #endif /* Properly accept or ignore signals the postmaster might send us */ pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config * file */ pqsignal(SIGINT, SIG_IGN); pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */ pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */ pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, WalRcvSigUsr1Handler); pqsignal(SIGUSR2, SIG_IGN); /* Reset some signals that are accepted by postmaster but not here */ pqsignal(SIGCHLD, SIG_DFL); pqsignal(SIGTTIN, SIG_DFL); pqsignal(SIGTTOU, SIG_DFL); pqsignal(SIGCONT, SIG_DFL); pqsignal(SIGWINCH, SIG_DFL); /* We allow SIGQUIT (quickdie) at all times */ sigdelset(&BlockSig, SIGQUIT); /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); if (walrcv_connect == NULL || walrcv_startstreaming == NULL || walrcv_endstreaming == NULL || walrcv_identify_system == NULL || walrcv_readtimelinehistoryfile == NULL || walrcv_receive == NULL || walrcv_send == NULL || walrcv_disconnect == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* * Create a resource owner to keep track of our resources (not clear that * we need this, but may as well have one). */ CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver"); /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); walrcv_connect(conninfo); DisableWalRcvImmediateExit(); first_stream = true; for (;;) { /* * Check that we're connected to a valid server using the * IDENTIFY_SYSTEM replication command, */ EnableWalRcvImmediateExit(); walrcv_identify_system(&primaryTLI); DisableWalRcvImmediateExit(); /* * Confirm that the current timeline of the primary is the same or * ahead of ours. */ if (primaryTLI < startpointTLI) ereport(ERROR, (errmsg("highest timeline %u of the primary is behind recovery timeline %u", primaryTLI, startpointTLI))); /* * Get any missing history files. We do this always, even when we're * not interested in that timeline, so that if we're promoted to become * the master later on, we don't select the same timeline that was * already used in the current master. This isn't bullet-proof - you'll * need some external software to manage your cluster if you need to * ensure that a unique timeline id is chosen in every case, but let's * avoid the confusion of timeline id collisions where we can. */ WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); /* * Start streaming. * * We'll try to start at the requested starting point and timeline, * even if it's different from the server's latest timeline. In case * we've already reached the end of the old timeline, the server will * finish the streaming immediately, and we will go back to await * orders from the startup process. If recovery_target_timeline is * 'latest', the startup process will scan pg_xlog and find the new * history file, bump recovery target timeline, and ask us to restart * on the new timeline. */ ThisTimeLineID = startpointTLI; if (walrcv_startstreaming(startpointTLI, startpoint)) { bool endofwal = false; if (first_stream) ereport(LOG, (errmsg("started streaming WAL from primary at %X/%X on timeline %u", (uint32) (startpoint >> 32), (uint32) startpoint, startpointTLI))); else ereport(LOG, (errmsg("restarted WAL streaming at %X/%X on timeline %u", (uint32) (startpoint >> 32), (uint32) startpoint, startpointTLI))); first_stream = false; /* Initialize LogstreamResult and buffers for processing messages */ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); initStringInfo(&reply_message); initStringInfo(&incoming_message); /* Initialize the last recv timestamp */ last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; /* Loop until end-of-streaming or error */ while (!endofwal) { char *buf; int len; /* * Emergency bailout if postmaster has died. This is to avoid * the necessity for manual cleanup of all postmaster children. */ if (!PostmasterIsAlive()) exit(1); /* * Exit walreceiver if we're not in recovery. This should not * happen, but cross-check the status here. */ if (!RecoveryInProgress()) ereport(FATAL, (errmsg("cannot continue WAL streaming, recovery has already ended"))); /* Process any requests or signals received recently */ ProcessWalRcvInterrupts(); if (got_SIGHUP) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); XLogWalRcvSendHSFeedback(true); } /* Wait a while for data to arrive */ len = walrcv_receive(NAPTIME_PER_CYCLE, &buf); if (len != 0) { /* * Process the received data, and any subsequent data we * can read without blocking. */ for (;;) { if (len > 0) { /* Something was received from master, so reset timeout */ last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1); } else if (len == 0) break; else if (len < 0) { ereport(LOG, (errmsg("replication terminated by primary server"), errdetail("End of WAL reached on timeline %u at %X/%X", startpointTLI, (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write))); endofwal = true; break; } len = walrcv_receive(0, &buf); } /* Let the master know that we received some data. */ XLogWalRcvSendReply(false, false); /* * If we've written some records, flush them to disk and * let the startup process and primary server know about * them. */ XLogWalRcvFlush(false); } else { /* * We didn't receive anything new. If we haven't heard * anything from the server for more than * wal_receiver_timeout / 2, ping the server. Also, if it's * been longer than wal_receiver_status_interval since the * last update we sent, send a status update to the master * anyway, to report any progress in applying WAL. */ bool requestReply = false; /* * Check if time since last receive from standby has * reached the configured limit. */ if (wal_receiver_timeout > 0) { TimestampTz now = GetCurrentTimestamp(); TimestampTz timeout; timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, wal_receiver_timeout); if (now >= timeout) ereport(ERROR, (errmsg("terminating walreceiver due to timeout"))); /* * We didn't receive anything new, for half of receiver * replication timeout. Ping the server. */ if (!ping_sent) { timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, (wal_receiver_timeout/2)); if (now >= timeout) { requestReply = true; ping_sent = true; } } } XLogWalRcvSendReply(requestReply, requestReply); XLogWalRcvSendHSFeedback(false); } } /* * The backend finished streaming. Exit streaming COPY-mode from * our side, too. */ EnableWalRcvImmediateExit(); walrcv_endstreaming(&primaryTLI); DisableWalRcvImmediateExit(); /* * If the server had switched to a new timeline that we didn't know * about when we began streaming, fetch its timeline history file * now. */ WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); } else ereport(LOG, (errmsg("primary server contains no more WAL on requested timeline %u", startpointTLI))); /* * End of WAL reached on the requested timeline. Close the last * segment, and await for new orders from the startup process. */ if (recvFile >= 0) { char xlogfname[MAXFNAMELEN]; XLogWalRcvFlush(false); if (close(recvFile) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not close log segment %s: %m", XLogFileNameP(recvFileTLI, recvSegNo)))); /* * Create .done file forcibly to prevent the streamed segment from * being archived later. */ XLogFileName(xlogfname, recvFileTLI, recvSegNo); XLogArchiveForceDone(xlogfname); } recvFile = -1; elog(DEBUG1, "walreceiver ended streaming and awaits new instructions"); WalRcvWaitForStartPosition(&startpoint, &startpointTLI); } /* not reached */ }
Definition at line 69 of file walreceiver.c.
Referenced by XLogWalRcvSendHSFeedback().
Definition at line 67 of file walreceiver.c.
Referenced by XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().
Definition at line 68 of file walreceiver.c.
Referenced by WalReceiverMain().
Definition at line 33 of file walreceiverfuncs.c.
Referenced by ProcessWalSndrMessage(), WalRcvDie(), WalRcvShutdownHandler(), WalRcvWaitForStartPosition(), WalReceiverMain(), and XLogWalRcvFlush().
PGDLLIMPORT walrcv_connect_type walrcv_connect |
Definition at line 72 of file walreceiver.c.
Referenced by _PG_init(), and WalReceiverMain().
PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect |
Definition at line 79 of file walreceiver.c.
Referenced by _PG_init(), WalRcvDie(), and WalReceiverMain().
PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming |
Definition at line 75 of file walreceiver.c.
Referenced by _PG_init(), and WalReceiverMain().
PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system |
Definition at line 73 of file walreceiver.c.
Referenced by _PG_init(), and WalReceiverMain().
Definition at line 76 of file walreceiver.c.
Referenced by _PG_init(), WalRcvFetchTimeLineHistoryFiles(), and WalReceiverMain().
PGDLLIMPORT walrcv_receive_type walrcv_receive |
Definition at line 77 of file walreceiver.c.
Referenced by _PG_init(), and WalReceiverMain().
PGDLLIMPORT walrcv_send_type walrcv_send |
Definition at line 78 of file walreceiver.c.
Referenced by _PG_init(), WalReceiverMain(), XLogWalRcvSendHSFeedback(), and XLogWalRcvSendReply().
PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming |
Definition at line 74 of file walreceiver.c.
Referenced by _PG_init(), and WalReceiverMain().