#include "postgres.h"#include <unistd.h>#include <sys/time.h>#include "libpq-fe.h"#include "access/xlog.h"#include "miscadmin.h"#include "replication/walreceiver.h"#include "utils/builtins.h"
Go to the source code of this file.
Functions | |
| void | _PG_init (void) |
| static void | libpqrcv_connect (char *conninfo) |
| static void | libpqrcv_identify_system (TimeLineID *primary_tli) |
| static void | libpqrcv_readtimelinehistoryfile (TimeLineID tli, char **filename, char **content, int *len) |
| static bool | libpqrcv_startstreaming (TimeLineID tli, XLogRecPtr startpoint) |
| static void | libpqrcv_endstreaming (TimeLineID *next_tli) |
| static int | libpqrcv_receive (int timeout, char **buffer) |
| static void | libpqrcv_send (const char *buffer, int nbytes) |
| static void | libpqrcv_disconnect (void) |
| static bool | libpq_select (int timeout_ms) |
| static PGresult * | libpqrcv_PQexec (const char *query) |
Variables | |
| PG_MODULE_MAGIC | |
| static PGconn * | streamConn = NULL |
| static char * | recvBuf = NULL |
| void _PG_init | ( | void | ) |
Definition at line 66 of file libpqwalreceiver.c.
References elog, ERROR, NULL, walrcv_connect, walrcv_disconnect, walrcv_endstreaming, walrcv_identify_system, walrcv_readtimelinehistoryfile, walrcv_receive, walrcv_send, and walrcv_startstreaming.
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
walrcv_readtimelinehistoryfile != NULL ||
walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
walrcv_receive != NULL || walrcv_send != NULL ||
walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_identify_system = libpqrcv_identify_system;
walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
walrcv_startstreaming = libpqrcv_startstreaming;
walrcv_endstreaming = libpqrcv_endstreaming;
walrcv_receive = libpqrcv_receive;
walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
}
| static bool libpq_select | ( | int | timeout_ms | ) | [static] |
Definition at line 304 of file libpqwalreceiver.c.
References Assert, EINTR, ereport, errcode_for_socket_access(), errmsg(), ERROR, NULL, PQsocket(), and select.
Referenced by libpqrcv_PQexec(), and libpqrcv_receive().
{
int ret;
Assert(streamConn != NULL);
if (PQsocket(streamConn) < 0)
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("socket not open")));
/* We use poll(2) if available, otherwise select(2) */
{
#ifdef HAVE_POLL
struct pollfd input_fd;
input_fd.fd = PQsocket(streamConn);
input_fd.events = POLLIN | POLLERR;
input_fd.revents = 0;
ret = poll(&input_fd, 1, timeout_ms);
#else /* !HAVE_POLL */
fd_set input_mask;
struct timeval timeout;
struct timeval *ptr_timeout;
FD_ZERO(&input_mask);
FD_SET(PQsocket(streamConn), &input_mask);
if (timeout_ms < 0)
ptr_timeout = NULL;
else
{
timeout.tv_sec = timeout_ms / 1000;
timeout.tv_usec = (timeout_ms % 1000) * 1000;
ptr_timeout = &timeout;
}
ret = select(PQsocket(streamConn) + 1, &input_mask,
NULL, NULL, ptr_timeout);
#endif /* HAVE_POLL */
}
if (ret == 0 || (ret < 0 && errno == EINTR))
return false;
if (ret < 0)
ereport(ERROR,
(errcode_for_socket_access(),
errmsg("select() failed: %m")));
return true;
}
| static void libpqrcv_connect | ( | char * | conninfo | ) | [static] |
Definition at line 89 of file libpqwalreceiver.c.
References CONNECTION_OK, ereport, errmsg(), ERROR, MAXCONNINFO, PQconnectdb(), PQerrorMessage(), PQstatus(), and snprintf().
{
char conninfo_repl[MAXCONNINFO + 75];
/*
* Connect using deliberately undocumented parameter: replication. The
* database name is ignored by the server in replication mode, but specify
* "replication" for .pgpass lookup.
*/
snprintf(conninfo_repl, sizeof(conninfo_repl),
"%s dbname=replication replication=true fallback_application_name=walreceiver",
conninfo);
streamConn = PQconnectdb(conninfo_repl);
if (PQstatus(streamConn) != CONNECTION_OK)
ereport(ERROR,
(errmsg("could not connect to the primary server: %s",
PQerrorMessage(streamConn))));
}
| static void libpqrcv_disconnect | ( | void | ) | [static] |
Definition at line 437 of file libpqwalreceiver.c.
References PQfinish().
{
PQfinish(streamConn);
streamConn = NULL;
}
| static void libpqrcv_endstreaming | ( | TimeLineID * | next_tli | ) | [static] |
Definition at line 206 of file libpqwalreceiver.c.
References ereport, errmsg(), ERROR, NULL, pg_atoi(), PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQflush(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQputCopyEnd(), and PQresultStatus().
{
PGresult *res;
if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
ereport(ERROR,
(errmsg("could not send end-of-streaming message to primary: %s",
PQerrorMessage(streamConn))));
/*
* After COPY is finished, we should receive a result set indicating the
* next timeline's ID, or just CommandComplete if the server was shut down.
*
* If we had not yet received CopyDone from the backend, PGRES_COPY_IN
* would also be possible. However, at the moment this function is only
* called after receiving CopyDone from the backend - the walreceiver
* never terminates replication on its own initiative.
*/
res = PQgetResult(streamConn);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
/* Read the next timeline's ID */
if (PQnfields(res) != 1 || PQntuples(res) != 1)
ereport(ERROR,
(errmsg("unexpected result set after end-of-streaming")));
*next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
PQclear(res);
/* the result set should be followed by CommandComplete */
res = PQgetResult(streamConn);
}
else
*next_tli = 0;
if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR,
(errmsg("error reading result of streaming command: %s",
PQerrorMessage(streamConn))));
/* Verify that there are no more results */
res = PQgetResult(streamConn);
if (res != NULL)
ereport(ERROR,
(errmsg("unexpected result after CommandComplete: %s",
PQerrorMessage(streamConn))));
}
| static void libpqrcv_identify_system | ( | TimeLineID * | primary_tli | ) | [static] |
Definition at line 114 of file libpqwalreceiver.c.
References ereport, errdetail(), errmsg(), ERROR, GetSystemIdentifier(), libpqrcv_PQexec(), pg_atoi(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), and snprintf().
{
PGresult *res;
char *primary_sysid;
char standby_sysid[32];
/*
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s",
PQerrorMessage(streamConn))));
}
if (PQnfields(res) != 3 || PQntuples(res) != 1)
{
int ntuples = PQntuples(res);
int nfields = PQnfields(res);
PQclear(res);
ereport(ERROR,
(errmsg("invalid response from primary server"),
errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
ntuples, nfields)));
}
primary_sysid = PQgetvalue(res, 0, 0);
*primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
/*
* Confirm that the system identifier of the primary is the same as ours.
*/
snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
GetSystemIdentifier());
if (strcmp(primary_sysid, standby_sysid) != 0)
{
PQclear(res);
ereport(ERROR,
(errmsg("database system identifier differs between the primary and standby"),
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid)));
}
PQclear(res);
}
| static PGresult * libpqrcv_PQexec | ( | const char * | query | ) | [static] |
Definition at line 373 of file libpqwalreceiver.c.
References CONNECTION_BAD, libpq_select(), NULL, PGRES_COPY_BOTH, PGRES_COPY_IN, PGRES_COPY_OUT, PQclear(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQresultStatus(), PQsendQuery(), and PQstatus().
Referenced by libpqrcv_identify_system(), libpqrcv_readtimelinehistoryfile(), and libpqrcv_startstreaming().
{
PGresult *result = NULL;
PGresult *lastResult = NULL;
/*
* PQexec() silently discards any prior query results on the connection.
* This is not required for walreceiver since it's expected that walsender
* won't generate any such junk results.
*/
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
*/
if (!PQsendQuery(streamConn, query))
return NULL;
for (;;)
{
/*
* Receive data until PQgetResult is ready to get the result without
* blocking.
*/
while (PQisBusy(streamConn))
{
/*
* We don't need to break down the sleep into smaller increments,
* and check for interrupts after each nap, since we can just
* elog(FATAL) within SIGTERM signal handler if the signal arrives
* in the middle of establishment of replication connection.
*/
if (!libpq_select(-1))
continue; /* interrupted */
if (PQconsumeInput(streamConn) == 0)
return NULL; /* trouble */
}
/*
* Emulate the PQexec()'s behavior of returning the last result when
* there are many. Since walsender will never generate multiple
* results, we skip the concatenation of error messages.
*/
result = PQgetResult(streamConn);
if (result == NULL)
break; /* query is complete */
PQclear(lastResult);
lastResult = result;
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
PQstatus(streamConn) == CONNECTION_BAD)
break;
}
return lastResult;
}
| static void libpqrcv_readtimelinehistoryfile | ( | TimeLineID | tli, | |
| char ** | filename, | |||
| char ** | content, | |||
| int * | len | |||
| ) | [static] |
Definition at line 257 of file libpqwalreceiver.c.
References ereport, errdetail(), errmsg(), ERROR, libpqrcv_PQexec(), palloc(), PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQgetlength(), PQgetvalue(), PQnfields(), PQntuples(), PQresultStatus(), pstrdup(), and snprintf().
{
PGresult *res;
char cmd[64];
/*
* Request the primary to send over the history file for given timeline.
*/
snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not receive timeline history file from "
"the primary server: %s",
PQerrorMessage(streamConn))));
}
if (PQnfields(res) != 2 || PQntuples(res) != 1)
{
int ntuples = PQntuples(res);
int nfields = PQnfields(res);
PQclear(res);
ereport(ERROR,
(errmsg("invalid response from primary server"),
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
ntuples, nfields)));
}
*filename = pstrdup(PQgetvalue(res, 0, 0));
*len = PQgetlength(res, 0, 1);
*content = palloc(*len);
memcpy(*content, PQgetvalue(res, 0, 1), *len);
PQclear(res);
}
| static int libpqrcv_receive | ( | int | timeout, | |
| char ** | buffer | |||
| ) | [static] |
Definition at line 461 of file libpqwalreceiver.c.
References ereport, errmsg(), ERROR, libpq_select(), NULL, PGRES_COMMAND_OK, PGRES_COPY_IN, PQclear(), PQconsumeInput(), PQerrorMessage(), PQfreemem(), PQgetCopyData(), PQgetResult(), PQresultStatus(), and recvBuf.
{
int rawlen;
if (recvBuf != NULL)
PQfreemem(recvBuf);
recvBuf = NULL;
/* Try to receive a CopyData message */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
{
/*
* No data available yet. If the caller requested to block, wait for
* more data to arrive.
*/
if (timeout > 0)
{
if (!libpq_select(timeout))
return 0;
}
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn))));
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
return 0;
}
if (rawlen == -1) /* end-of-streaming or error */
{
PGresult *res;
res = PQgetResult(streamConn);
if (PQresultStatus(res) == PGRES_COMMAND_OK ||
PQresultStatus(res) == PGRES_COPY_IN)
{
PQclear(res);
return -1;
}
else
{
PQclear(res);
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn))));
}
}
if (rawlen < -1)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn))));
/* Return received messages to caller */
*buffer = recvBuf;
return rawlen;
}
| static void libpqrcv_send | ( | const char * | buffer, | |
| int | nbytes | |||
| ) | [static] |
Definition at line 528 of file libpqwalreceiver.c.
References ereport, errmsg(), ERROR, PQerrorMessage(), PQflush(), and PQputCopyData().
{
if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
PQflush(streamConn))
ereport(ERROR,
(errmsg("could not send data to WAL stream: %s",
PQerrorMessage(streamConn))));
}
| static bool libpqrcv_startstreaming | ( | TimeLineID | tli, | |
| XLogRecPtr | startpoint | |||
| ) | [static] |
Definition at line 174 of file libpqwalreceiver.c.
References ereport, errmsg(), ERROR, libpqrcv_PQexec(), PGRES_COMMAND_OK, PGRES_COPY_BOTH, PQclear(), PQerrorMessage(), PQresultStatus(), and snprintf().
{
char cmd[64];
PGresult *res;
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
(uint32) (startpoint >> 32), (uint32) startpoint,
tli);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
return false;
}
else if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
PQerrorMessage(streamConn))));
}
PQclear(res);
return true;
}
Definition at line 38 of file libpqwalreceiver.c.
char* recvBuf = NULL [static] |
Definition at line 46 of file libpqwalreceiver.c.
Referenced by libpqrcv_receive().
PGconn* streamConn = NULL [static] |
Definition at line 43 of file libpqwalreceiver.c.
1.7.1