#include "postgres.h"
#include <unistd.h>
#include <sys/time.h>
#include "libpq-fe.h"
#include "access/xlog.h"
#include "miscadmin.h"
#include "replication/walreceiver.h"
#include "utils/builtins.h"
Go to the source code of this file.
Functions | |
void | _PG_init (void) |
static void | libpqrcv_connect (char *conninfo) |
static void | libpqrcv_identify_system (TimeLineID *primary_tli) |
static void | libpqrcv_readtimelinehistoryfile (TimeLineID tli, char **filename, char **content, int *len) |
static bool | libpqrcv_startstreaming (TimeLineID tli, XLogRecPtr startpoint) |
static void | libpqrcv_endstreaming (TimeLineID *next_tli) |
static int | libpqrcv_receive (int timeout, char **buffer) |
static void | libpqrcv_send (const char *buffer, int nbytes) |
static void | libpqrcv_disconnect (void) |
static bool | libpq_select (int timeout_ms) |
static PGresult * | libpqrcv_PQexec (const char *query) |
Variables | |
PG_MODULE_MAGIC | |
static PGconn * | streamConn = NULL |
static char * | recvBuf = NULL |
void _PG_init | ( | void | ) |
Definition at line 66 of file libpqwalreceiver.c.
References elog, ERROR, NULL, walrcv_connect, walrcv_disconnect, walrcv_endstreaming, walrcv_identify_system, walrcv_readtimelinehistoryfile, walrcv_receive, walrcv_send, and walrcv_startstreaming.
{ /* Tell walreceiver how to reach us */ if (walrcv_connect != NULL || walrcv_identify_system != NULL || walrcv_readtimelinehistoryfile != NULL || walrcv_startstreaming != NULL || walrcv_endstreaming != NULL || walrcv_receive != NULL || walrcv_send != NULL || walrcv_disconnect != NULL) elog(ERROR, "libpqwalreceiver already loaded"); walrcv_connect = libpqrcv_connect; walrcv_identify_system = libpqrcv_identify_system; walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile; walrcv_startstreaming = libpqrcv_startstreaming; walrcv_endstreaming = libpqrcv_endstreaming; walrcv_receive = libpqrcv_receive; walrcv_send = libpqrcv_send; walrcv_disconnect = libpqrcv_disconnect; }
static bool libpq_select | ( | int | timeout_ms | ) | [static] |
Definition at line 304 of file libpqwalreceiver.c.
References Assert, EINTR, ereport, errcode_for_socket_access(), errmsg(), ERROR, NULL, PQsocket(), and select.
Referenced by libpqrcv_PQexec(), and libpqrcv_receive().
{ int ret; Assert(streamConn != NULL); if (PQsocket(streamConn) < 0) ereport(ERROR, (errcode_for_socket_access(), errmsg("socket not open"))); /* We use poll(2) if available, otherwise select(2) */ { #ifdef HAVE_POLL struct pollfd input_fd; input_fd.fd = PQsocket(streamConn); input_fd.events = POLLIN | POLLERR; input_fd.revents = 0; ret = poll(&input_fd, 1, timeout_ms); #else /* !HAVE_POLL */ fd_set input_mask; struct timeval timeout; struct timeval *ptr_timeout; FD_ZERO(&input_mask); FD_SET(PQsocket(streamConn), &input_mask); if (timeout_ms < 0) ptr_timeout = NULL; else { timeout.tv_sec = timeout_ms / 1000; timeout.tv_usec = (timeout_ms % 1000) * 1000; ptr_timeout = &timeout; } ret = select(PQsocket(streamConn) + 1, &input_mask, NULL, NULL, ptr_timeout); #endif /* HAVE_POLL */ } if (ret == 0 || (ret < 0 && errno == EINTR)) return false; if (ret < 0) ereport(ERROR, (errcode_for_socket_access(), errmsg("select() failed: %m"))); return true; }
static void libpqrcv_connect | ( | char * | conninfo | ) | [static] |
Definition at line 89 of file libpqwalreceiver.c.
References CONNECTION_OK, ereport, errmsg(), ERROR, MAXCONNINFO, PQconnectdb(), PQerrorMessage(), PQstatus(), and snprintf().
{ char conninfo_repl[MAXCONNINFO + 75]; /* * Connect using deliberately undocumented parameter: replication. The * database name is ignored by the server in replication mode, but specify * "replication" for .pgpass lookup. */ snprintf(conninfo_repl, sizeof(conninfo_repl), "%s dbname=replication replication=true fallback_application_name=walreceiver", conninfo); streamConn = PQconnectdb(conninfo_repl); if (PQstatus(streamConn) != CONNECTION_OK) ereport(ERROR, (errmsg("could not connect to the primary server: %s", PQerrorMessage(streamConn)))); }
static void libpqrcv_disconnect | ( | void | ) | [static] |
Definition at line 437 of file libpqwalreceiver.c.
References PQfinish().
{ PQfinish(streamConn); streamConn = NULL; }
static void libpqrcv_endstreaming | ( | TimeLineID * | next_tli | ) | [static] |
Definition at line 206 of file libpqwalreceiver.c.
References ereport, errmsg(), ERROR, NULL, pg_atoi(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQflush(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQputCopyEnd(), and PQresultStatus().
{ PGresult *res; if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn)) ereport(ERROR, (errmsg("could not send end-of-streaming message to primary: %s", PQerrorMessage(streamConn)))); /* * After COPY is finished, we should receive a result set indicating the * next timeline's ID, or just CommandComplete if the server was shut down. * * If we had not yet received CopyDone from the backend, PGRES_COPY_IN * would also be possible. However, at the moment this function is only * called after receiving CopyDone from the backend - the walreceiver * never terminates replication on its own initiative. */ res = PQgetResult(streamConn); if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* Read the next timeline's ID */ if (PQnfields(res) != 1 || PQntuples(res) != 1) ereport(ERROR, (errmsg("unexpected result set after end-of-streaming"))); *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0); PQclear(res); /* the result set should be followed by CommandComplete */ res = PQgetResult(streamConn); } else *next_tli = 0; if (PQresultStatus(res) != PGRES_COMMAND_OK) ereport(ERROR, (errmsg("error reading result of streaming command: %s", PQerrorMessage(streamConn)))); /* Verify that there are no more results */ res = PQgetResult(streamConn); if (res != NULL) ereport(ERROR, (errmsg("unexpected result after CommandComplete: %s", PQerrorMessage(streamConn)))); }
static void libpqrcv_identify_system | ( | TimeLineID * | primary_tli | ) | [static] |
Definition at line 114 of file libpqwalreceiver.c.
References ereport, errdetail(), errmsg(), ERROR, GetSystemIdentifier(), libpqrcv_PQexec(), pg_atoi(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), and snprintf().
{ PGresult *res; char *primary_sysid; char standby_sysid[32]; /* * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ res = libpqrcv_PQexec("IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); ereport(ERROR, (errmsg("could not receive database system identifier and timeline ID from " "the primary server: %s", PQerrorMessage(streamConn)))); } if (PQnfields(res) != 3 || PQntuples(res) != 1) { int ntuples = PQntuples(res); int nfields = PQnfields(res); PQclear(res); ereport(ERROR, (errmsg("invalid response from primary server"), errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.", ntuples, nfields))); } primary_sysid = PQgetvalue(res, 0, 0); *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); /* * Confirm that the system identifier of the primary is the same as ours. */ snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, GetSystemIdentifier()); if (strcmp(primary_sysid, standby_sysid) != 0) { PQclear(res); ereport(ERROR, (errmsg("database system identifier differs between the primary and standby"), errdetail("The primary's identifier is %s, the standby's identifier is %s.", primary_sysid, standby_sysid))); } PQclear(res); }
static PGresult * libpqrcv_PQexec | ( | const char * | query | ) | [static] |
Definition at line 373 of file libpqwalreceiver.c.
References CONNECTION_BAD, libpq_select(), NULL, PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQresultStatus(), PQsendQuery(), and PQstatus().
Referenced by libpqrcv_identify_system(), libpqrcv_readtimelinehistoryfile(), and libpqrcv_startstreaming().
{ PGresult *result = NULL; PGresult *lastResult = NULL; /* * PQexec() silently discards any prior query results on the connection. * This is not required for walreceiver since it's expected that walsender * won't generate any such junk results. */ /* * Submit a query. Since we don't use non-blocking mode, this also can * block. But its risk is relatively small, so we ignore that for now. */ if (!PQsendQuery(streamConn, query)) return NULL; for (;;) { /* * Receive data until PQgetResult is ready to get the result without * blocking. */ while (PQisBusy(streamConn)) { /* * We don't need to break down the sleep into smaller increments, * and check for interrupts after each nap, since we can just * elog(FATAL) within SIGTERM signal handler if the signal arrives * in the middle of establishment of replication connection. */ if (!libpq_select(-1)) continue; /* interrupted */ if (PQconsumeInput(streamConn) == 0) return NULL; /* trouble */ } /* * Emulate the PQexec()'s behavior of returning the last result when * there are many. Since walsender will never generate multiple * results, we skip the concatenation of error messages. */ result = PQgetResult(streamConn); if (result == NULL) break; /* query is complete */ PQclear(lastResult); lastResult = result; if (PQresultStatus(lastResult) == PGRES_COPY_IN || PQresultStatus(lastResult) == PGRES_COPY_OUT || PQresultStatus(lastResult) == PGRES_COPY_BOTH || PQstatus(streamConn) == CONNECTION_BAD) break; } return lastResult; }
static void libpqrcv_readtimelinehistoryfile | ( | TimeLineID | tli, | |
char ** | filename, | |||
char ** | content, | |||
int * | len | |||
) | [static] |
Definition at line 257 of file libpqwalreceiver.c.
References ereport, errdetail(), errmsg(), ERROR, libpqrcv_PQexec(), palloc(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetlength(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), and snprintf().
{ PGresult *res; char cmd[64]; /* * Request the primary to send over the history file for given timeline. */ snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); res = libpqrcv_PQexec(cmd); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); ereport(ERROR, (errmsg("could not receive timeline history file from " "the primary server: %s", PQerrorMessage(streamConn)))); } if (PQnfields(res) != 2 || PQntuples(res) != 1) { int ntuples = PQntuples(res); int nfields = PQnfields(res); PQclear(res); ereport(ERROR, (errmsg("invalid response from primary server"), errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.", ntuples, nfields))); } *filename = pstrdup(PQgetvalue(res, 0, 0)); *len = PQgetlength(res, 0, 1); *content = palloc(*len); memcpy(*content, PQgetvalue(res, 0, 1), *len); PQclear(res); }
static int libpqrcv_receive | ( | int | timeout, | |
char ** | buffer | |||
) | [static] |
Definition at line 461 of file libpqwalreceiver.c.
References ereport, errmsg(), ERROR, libpq_select(), NULL, PGRES_COMMAND_OK, PGRES_COPY_IN, PQclear(), PQconsumeInput(), PQerrorMessage(), PQfreemem(), PQgetCopyData(), PQgetResult(), PQresultStatus(), and recvBuf.
{ int rawlen; if (recvBuf != NULL) PQfreemem(recvBuf); recvBuf = NULL; /* Try to receive a CopyData message */ rawlen = PQgetCopyData(streamConn, &recvBuf, 1); if (rawlen == 0) { /* * No data available yet. If the caller requested to block, wait for * more data to arrive. */ if (timeout > 0) { if (!libpq_select(timeout)) return 0; } if (PQconsumeInput(streamConn) == 0) ereport(ERROR, (errmsg("could not receive data from WAL stream: %s", PQerrorMessage(streamConn)))); /* Now that we've consumed some input, try again */ rawlen = PQgetCopyData(streamConn, &recvBuf, 1); if (rawlen == 0) return 0; } if (rawlen == -1) /* end-of-streaming or error */ { PGresult *res; res = PQgetResult(streamConn); if (PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_COPY_IN) { PQclear(res); return -1; } else { PQclear(res); ereport(ERROR, (errmsg("could not receive data from WAL stream: %s", PQerrorMessage(streamConn)))); } } if (rawlen < -1) ereport(ERROR, (errmsg("could not receive data from WAL stream: %s", PQerrorMessage(streamConn)))); /* Return received messages to caller */ *buffer = recvBuf; return rawlen; }
static void libpqrcv_send | ( | const char * | buffer, | |
int | nbytes | |||
) | [static] |
Definition at line 528 of file libpqwalreceiver.c.
References ereport, errmsg(), ERROR, PQerrorMessage(), PQflush(), and PQputCopyData().
{ if (PQputCopyData(streamConn, buffer, nbytes) <= 0 || PQflush(streamConn)) ereport(ERROR, (errmsg("could not send data to WAL stream: %s", PQerrorMessage(streamConn)))); }
static bool libpqrcv_startstreaming | ( | TimeLineID | tli, | |
XLogRecPtr | startpoint | |||
) | [static] |
Definition at line 174 of file libpqwalreceiver.c.
References ereport, errmsg(), ERROR, libpqrcv_PQexec(), PGRES_COMMAND_OK, PGRES_COPY_BOTH, PQclear(), PQerrorMessage(), PQresultStatus(), and snprintf().
{ char cmd[64]; PGresult *res; /* Start streaming from the point requested by startup process */ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u", (uint32) (startpoint >> 32), (uint32) startpoint, tli); res = libpqrcv_PQexec(cmd); if (PQresultStatus(res) == PGRES_COMMAND_OK) { PQclear(res); return false; } else if (PQresultStatus(res) != PGRES_COPY_BOTH) { PQclear(res); ereport(ERROR, (errmsg("could not start WAL streaming: %s", PQerrorMessage(streamConn)))); } PQclear(res); return true; }
Definition at line 38 of file libpqwalreceiver.c.
char* recvBuf = NULL [static] |
Definition at line 46 of file libpqwalreceiver.c.
Referenced by libpqrcv_receive().
PGconn* streamConn = NULL [static] |
Definition at line 43 of file libpqwalreceiver.c.