#include "access/xlogdefs.h"

Go to the source code of this file.
Typedefs | |
| typedef bool(* | stream_stop_callback )(XLogRecPtr segendpos, uint32 timeline, bool segment_finished) |
Functions | |
| bool | CheckServerVersionForStreaming (PGconn *conn) |
| bool | ReceiveXlogStream (PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix) |
| typedef bool(* stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished) |
Definition at line 7 of file receivelog.h.
Definition at line 446 of file receivelog.c.
References _, PQparameterStatus(), PQserverVersion(), and progname.
Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().
{
int minServerMajor,
maxServerMajor;
int serverMajor;
/*
* The message format used in streaming replication changed in 9.3, so we
* cannot stream from older servers. And we don't support servers newer
* than the client; it might work, but we don't know, so err on the safe
* side.
*/
minServerMajor = 903;
maxServerMajor = PG_VERSION_NUM / 100;
serverMajor = PQserverVersion(conn) / 100;
if (serverMajor < minServerMajor || serverMajor > maxServerMajor)
{
const char *serverver = PQparameterStatus(conn, "server_version");
fprintf(stderr, _("%s: incompatible server version %s; streaming is only supported with server version %s\n"),
progname,
serverver ? serverver : "'unknown'",
"9.3");
return false;
}
return true;
}
| bool ReceiveXlogStream | ( | PGconn * | conn, | |
| XLogRecPtr | startpos, | |||
| uint32 | timeline, | |||
| char * | sysidentifier, | |||
| char * | basedir, | |||
| stream_stop_callback | stream_stop, | |||
| int | standby_message_timeout, | |||
| char * | partial_suffix | |||
| ) |
Definition at line 503 of file receivelog.c.
References _, CheckServerVersionForStreaming(), close, current_walfile_name, error(), existsTimeLineHistoryFile(), HandleCopyStream(), NULL, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), progname, snprintf(), strerror(), walfile, and writeTimeLineHistoryFile().
Referenced by LogStreamerMain(), and StreamLog().
{
char query[128];
PGresult *res;
XLogRecPtr stoppos;
/*
* The caller should've checked the server version already, but doesn't do
* any harm to check it here too.
*/
if (!CheckServerVersionForStreaming(conn))
return false;
if (sysidentifier != NULL)
{
/* Validate system identifier hasn't changed */
res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
fprintf(stderr,
_("%s: could not send replication command \"%s\": %s"),
progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
PQclear(res);
return false;
}
if (PQnfields(res) != 3 || PQntuples(res) != 1)
{
fprintf(stderr,
_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 3);
PQclear(res);
return false;
}
if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
{
fprintf(stderr,
_("%s: system identifier does not match between base backup and streaming connection\n"),
progname);
PQclear(res);
return false;
}
if (timeline > atoi(PQgetvalue(res, 0, 1)))
{
fprintf(stderr,
_("%s: starting timeline %u is not present in the server\n"),
progname, timeline);
PQclear(res);
return false;
}
PQclear(res);
}
while (1)
{
/*
* Fetch the timeline history file for this timeline, if we don't
* have it already.
*/
if (!existsTimeLineHistoryFile(basedir, timeline))
{
snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
/* FIXME: we might send it ok, but get an error */
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
PQclear(res);
return false;
}
/*
* The response to TIMELINE_HISTORY is a single row result set
* with two fields: filename and content
*/
if (PQnfields(res) != 2 || PQntuples(res) != 1)
{
fprintf(stderr,
_("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 2);
}
/* Write the history file to disk */
writeTimeLineHistoryFile(basedir, timeline,
PQgetvalue(res, 0, 0),
PQgetvalue(res, 0, 1));
PQclear(res);
}
/*
* Before we start streaming from the requested location, check
* if the callback tells us to stop here.
*/
if (stream_stop(startpos, timeline, false))
return true;
/* Initiate the replication stream at specified location */
snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
(uint32) (startpos >> 32), (uint32) startpos,
timeline);
res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, "START_REPLICATION", PQresultErrorMessage(res));
PQclear(res);
return false;
}
PQclear(res);
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
&stoppos);
if (res == NULL)
goto error;
/*
* Streaming finished.
*
* There are two possible reasons for that: a controlled shutdown,
* or we reached the end of the current timeline. In case of
* end-of-timeline, the server sends a result set after Copy has
* finished, containing the next timeline's ID. Read that, and
* restart streaming from the next timeline.
*/
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
/*
* End-of-timeline. Read the next timeline's ID.
*/
uint32 newtimeline;
newtimeline = atoi(PQgetvalue(res, 0, 0));
PQclear(res);
if (newtimeline <= timeline)
{
/* shouldn't happen */
fprintf(stderr,
"server reported unexpected next timeline %u, following timeline %u\n",
newtimeline, timeline);
goto error;
}
/* Read the final result, which should be CommandComplete. */
res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
progname, PQresultErrorMessage(res));
goto error;
}
PQclear(res);
/*
* Loop back to start streaming from the new timeline.
* Always start streaming at the beginning of a segment.
*/
timeline = newtimeline;
startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
continue;
}
else if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
/*
* End of replication (ie. controlled shut down of the server).
*
* Check if the callback thinks it's OK to stop here. If not,
* complain.
*/
if (stream_stop(stoppos, timeline, false))
return true;
else
{
fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
progname);
goto error;
}
}
else
{
/* Server returned an error. */
fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
progname, PQresultErrorMessage(res));
goto error;
}
}
error:
if (walfile != -1 && close(walfile) != 0)
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, current_walfile_name, strerror(errno));
walfile = -1;
return false;
}
1.7.1