#include <signal.h>
#include "fmgr.h"
Go to the source code of this file.
Defines | |
#define | WalSndWakeupRequest() do { wake_wal_senders = true; } while (0) |
#define | WalSndWakeupProcessRequests() |
Functions | |
void | InitWalSender (void) |
void | exec_replication_command (const char *query_string) |
void | WalSndErrorCleanup (void) |
void | WalSndSignals (void) |
Size | WalSndShmemSize (void) |
void | WalSndShmemInit (void) |
void | WalSndWakeup (void) |
void | WalSndRqstFileReload (void) |
Datum | pg_stat_get_wal_senders (PG_FUNCTION_ARGS) |
Variables | |
bool | am_walsender |
bool | am_cascading_walsender |
bool | wake_wal_senders |
int | max_wal_senders |
int | wal_sender_timeout |
#define WalSndWakeupProcessRequests | ( | ) |
do \ { \ if (wake_wal_senders) \ { \ wake_wal_senders = false; \ if (max_wal_senders > 0) \ WalSndWakeup(); \ } \ } while (0)
Definition at line 51 of file walsender.h.
Referenced by XLogBackgroundFlush(), XLogFlush(), and XLogInsert().
#define WalSndWakeupRequest | ( | ) | do { wake_wal_senders = true; } while (0) |
Definition at line 45 of file walsender.h.
Referenced by XLogWrite().
void exec_replication_command | ( | const char * | query_string | ) |
Definition at line 606 of file walsender.c.
References ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, ALLOCSET_DEFAULT_MINSIZE, AllocSetContextCreate(), CHECK_FOR_INTERRUPTS, CurrentMemoryContext, DEBUG1, DestRemote, elog, EndCommand(), ereport, errcode(), errmsg_internal(), ERROR, IdentifySystem(), MemoryContextDelete(), MemoryContextSwitchTo(), replication_parse_result, replication_scanner_init(), replication_yyparse(), SendBaseBackup(), SendTimeLineHistory(), StartReplication(), T_BaseBackupCmd, T_IdentifySystemCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, TimeLineHistoryCmd::type, and Node::type.
Referenced by PostgresMain().
{ int parse_rc; Node *cmd_node; MemoryContext cmd_context; MemoryContext old_context; elog(DEBUG1, "received replication command: %s", cmd_string); CHECK_FOR_INTERRUPTS(); cmd_context = AllocSetContextCreate(CurrentMemoryContext, "Replication command context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); old_context = MemoryContextSwitchTo(cmd_context); replication_scanner_init(cmd_string); parse_rc = replication_yyparse(); if (parse_rc != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), (errmsg_internal("replication command parser returned %d", parse_rc)))); cmd_node = replication_parse_result; switch (cmd_node->type) { case T_IdentifySystemCmd: IdentifySystem(); break; case T_StartReplicationCmd: StartReplication((StartReplicationCmd *) cmd_node); break; case T_BaseBackupCmd: SendBaseBackup((BaseBackupCmd *) cmd_node); break; case T_TimeLineHistoryCmd: SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); break; default: elog(ERROR, "unrecognized replication command node tag: %u", cmd_node->type); } /* done */ MemoryContextSwitchTo(old_context); MemoryContextDelete(cmd_context); /* Send CommandComplete message */ EndCommand("SELECT", DestRemote); }
void InitWalSender | ( | void | ) |
Definition at line 187 of file walsender.c.
References am_cascading_walsender, CurrentResourceOwner, InitWalSenderSlot(), MarkPostmasterChildWalSender(), NULL, PMSIGNAL_ADVANCE_STATE_MACHINE, RecoveryInProgress(), ResourceOwnerCreate(), and SendPostmasterSignal().
Referenced by PostgresMain().
{ am_cascading_walsender = RecoveryInProgress(); /* Create a per-walsender data structure in shared memory */ InitWalSenderSlot(); /* Set up resource owner */ CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner"); /* * Let postmaster know that we're a WAL sender. Once we've declared us as * a WAL sender process, postmaster will let us outlive the bgwriter and * kill us last in the shutdown sequence, so we get a chance to stream all * remaining WAL at shutdown, including the shutdown checkpoint. Note that * there's no going back, and we mustn't write any WAL records after this. */ MarkPostmasterChildWalSender(); SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); }
Datum pg_stat_get_wal_senders | ( | PG_FUNCTION_ARGS | ) |
Definition at line 1838 of file walsender.c.
References ReturnSetInfo::allowedModes, WalSnd::apply, CStringGetTextDatum, ReturnSetInfo::econtext, ExprContext::ecxt_per_query_memory, elog, ereport, errcode(), errmsg(), ERROR, WalSnd::flush, get_call_result_type(), i, Int32GetDatum, IsA, LW_SHARED, LWLockAcquire(), LWLockRelease(), max_wal_senders, MemoryContextSwitchTo(), MemSet, WalSnd::mutex, NULL, palloc(), pfree(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, ReturnSetInfo::returnMode, WalSnd::sentPtr, sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SFRM_Materialize, snprintf(), SpinLockAcquire, SpinLockRelease, WalSnd::state, superuser(), WalSnd::sync_standby_priority, SyncRepLock, tuplestore_begin_heap(), tuplestore_donestoring, tuplestore_putvalues(), values, WalSndGetStateString(), WalSndCtlData::walsnds, WALSNDSTATE_STREAMING, work_mem, WalSnd::write, and XLogRecPtrIsInvalid.
{ #define PG_STAT_GET_WAL_SENDERS_COLS 8 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; int *sync_priority; int priority = 0; int sync_standby = -1; int i; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("set-valued function called in context that cannot accept a set"))); if (!(rsinfo->allowedModes & SFRM_Materialize)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("materialize mode required, but it is not " \ "allowed in this context"))); /* Build a tuple descriptor for our result type */ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; oldcontext = MemoryContextSwitchTo(per_query_ctx); tupstore = tuplestore_begin_heap(true, false, work_mem); rsinfo->returnMode = SFRM_Materialize; rsinfo->setResult = tupstore; rsinfo->setDesc = tupdesc; MemoryContextSwitchTo(oldcontext); /* * Get the priorities of sync standbys all in one go, to minimise lock * acquisitions and to allow us to evaluate who is the current sync * standby. This code must match the code in SyncRepReleaseWaiters(). */ sync_priority = palloc(sizeof(int) * max_wal_senders); LWLockAcquire(SyncRepLock, LW_SHARED); for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; if (walsnd->pid != 0) { /* * Treat a standby such as a pg_basebackup background process * which always returns an invalid flush location, as an * asynchronous standby. */ sync_priority[i] = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : walsnd->sync_standby_priority; if (walsnd->state == WALSNDSTATE_STREAMING && walsnd->sync_standby_priority > 0 && (priority == 0 || priority > walsnd->sync_standby_priority) && !XLogRecPtrIsInvalid(walsnd->flush)) { priority = walsnd->sync_standby_priority; sync_standby = i; } } } LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; char location[MAXFNAMELEN]; XLogRecPtr sentPtr; XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; if (walsnd->pid == 0) continue; SpinLockAcquire(&walsnd->mutex); sentPtr = walsnd->sentPtr; state = walsnd->state; write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(walsnd->pid); if (!superuser()) { /* * Only superusers can see details. Other users only get the pid * value to know it's a walsender, but no details. */ MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1); } else { values[1] = CStringGetTextDatum(WalSndGetStateString(state)); snprintf(location, sizeof(location), "%X/%X", (uint32) (sentPtr >> 32), (uint32) sentPtr); values[2] = CStringGetTextDatum(location); if (write == 0) nulls[3] = true; snprintf(location, sizeof(location), "%X/%X", (uint32) (write >> 32), (uint32) write); values[3] = CStringGetTextDatum(location); if (flush == 0) nulls[4] = true; snprintf(location, sizeof(location), "%X/%X", (uint32) (flush >> 32), (uint32) flush); values[4] = CStringGetTextDatum(location); if (apply == 0) nulls[5] = true; snprintf(location, sizeof(location), "%X/%X", (uint32) (apply >> 32), (uint32) apply); values[5] = CStringGetTextDatum(location); values[6] = Int32GetDatum(sync_priority[i]); /* * More easily understood version of standby state. This is purely * informational, not different from priority. */ if (sync_priority[i] == 0) values[7] = CStringGetTextDatum("async"); else if (i == sync_standby) values[7] = CStringGetTextDatum("sync"); else values[7] = CStringGetTextDatum("potential"); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } pfree(sync_priority); /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); return (Datum) 0; }
void WalSndErrorCleanup | ( | void | ) |
Definition at line 216 of file walsender.c.
References close, proc_exit(), replication_active, sendFile, walsender_ready_to_stop, WalSndSetState(), and WALSNDSTATE_STARTUP.
Referenced by PostgresMain().
{ if (sendFile >= 0) { close(sendFile); sendFile = -1; } replication_active = false; if (walsender_ready_to_stop) proc_exit(0); /* Revert back to startup state */ WalSndSetState(WALSNDSTATE_STARTUP); }
void WalSndRqstFileReload | ( | void | ) |
Definition at line 1652 of file walsender.c.
References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, and WalSndCtlData::walsnds.
Referenced by KeepFileRestoredFromArchive().
{ int i; for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; if (walsnd->pid == 0) continue; SpinLockAcquire(&walsnd->mutex); walsnd->needreload = true; SpinLockRelease(&walsnd->mutex); } }
void WalSndShmemInit | ( | void | ) |
Definition at line 1753 of file walsender.c.
References i, InitSharedLatch(), WalSnd::latch, max_wal_senders, MemSet, WalSnd::mutex, ShmemInitStruct(), SHMQueueInit(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::walsnds, and WalSndShmemSize().
Referenced by CreateSharedMemoryAndSemaphores().
{ bool found; int i; WalSndCtl = (WalSndCtlData *) ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found); if (!found) { /* First time through, so initialize */ MemSet(WalSndCtl, 0, WalSndShmemSize()); for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) SHMQueueInit(&(WalSndCtl->SyncRepQueue[i])); for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockInit(&walsnd->mutex); InitSharedLatch(&walsnd->latch); } } }
Size WalSndShmemSize | ( | void | ) |
Definition at line 1741 of file walsender.c.
References add_size(), max_wal_senders, mul_size(), and offsetof.
Referenced by CreateSharedMemoryAndSemaphores(), and WalSndShmemInit().
{ Size size = 0; size = offsetof(WalSndCtlData, walsnds); size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd))); return size; }
void WalSndSignals | ( | void | ) |
Definition at line 1717 of file walsender.c.
References die(), InitializeTimeouts(), pqsignal(), quickdie(), SIG_DFL, SIG_IGN, SIGCHLD, SIGCONT, SIGHUP, SIGPIPE, SIGQUIT, SIGTTIN, SIGTTOU, SIGUSR1, SIGUSR2, SIGWINCH, WalSndLastCycleHandler(), WalSndSigHupHandler(), and WalSndXLogSendHandler().
Referenced by PostgresMain().
{ /* Set up signal handlers */ pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config * file */ pqsignal(SIGINT, SIG_IGN); /* not used */ pqsignal(SIGTERM, die); /* request shutdown */ pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and * shutdown */ /* Reset some signals that are accepted by postmaster but not here */ pqsignal(SIGCHLD, SIG_DFL); pqsignal(SIGTTIN, SIG_DFL); pqsignal(SIGTTOU, SIG_DFL); pqsignal(SIGCONT, SIG_DFL); pqsignal(SIGWINCH, SIG_DFL); }
void WalSndWakeup | ( | void | ) |
Definition at line 1786 of file walsender.c.
References i, WalSnd::latch, max_wal_senders, SetLatch(), and WalSndCtlData::walsnds.
Referenced by KeepFileRestoredFromArchive(), StartupXLOG(), and XLogWalRcvFlush().
Definition at line 92 of file walsender.c.
Referenced by IdentifySystem(), InitWalSender(), ProcessStandbyReplyMessage(), StartReplication(), SyncRepGetStandbyPriority(), XLogRead(), and XLogSend().
Definition at line 91 of file walsender.c.
Referenced by BackendInitialize(), check_db(), ClientAuthentication(), forbidden_in_wal_sender(), InitPostgres(), MarkPostmasterChildWalSender(), PerformAuthentication(), PostgresMain(), ProcessStartupPacket(), and WalSndSetState().
int max_wal_senders |
Definition at line 96 of file walsender.c.
Referenced by InitWalSenderSlot(), pg_stat_get_wal_senders(), PostmasterMain(), SpinlockSemas(), SyncRepReleaseWaiters(), WalSndRqstFileReload(), WalSndShmemInit(), WalSndShmemSize(), and WalSndWakeup().
Definition at line 102 of file walsender.c.
Definition at line 97 of file walsender.c.
Referenced by WalSndLoop().