#include "access/xlogdefs.h"
Go to the source code of this file.
Typedefs | |
typedef bool(* | stream_stop_callback )(XLogRecPtr segendpos, uint32 timeline, bool segment_finished) |
Functions | |
bool | CheckServerVersionForStreaming (PGconn *conn) |
bool | ReceiveXlogStream (PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix) |
typedef bool(* stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished) |
Definition at line 7 of file receivelog.h.
Definition at line 446 of file receivelog.c.
References _, PQparameterStatus(), PQserverVersion(), and progname.
Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().
{ int minServerMajor, maxServerMajor; int serverMajor; /* * The message format used in streaming replication changed in 9.3, so we * cannot stream from older servers. And we don't support servers newer * than the client; it might work, but we don't know, so err on the safe * side. */ minServerMajor = 903; maxServerMajor = PG_VERSION_NUM / 100; serverMajor = PQserverVersion(conn) / 100; if (serverMajor < minServerMajor || serverMajor > maxServerMajor) { const char *serverver = PQparameterStatus(conn, "server_version"); fprintf(stderr, _("%s: incompatible server version %s; streaming is only supported with server version %s\n"), progname, serverver ? serverver : "'unknown'", "9.3"); return false; } return true; }
bool ReceiveXlogStream | ( | PGconn * | conn, | |
XLogRecPtr | startpos, | |||
uint32 | timeline, | |||
char * | sysidentifier, | |||
char * | basedir, | |||
stream_stop_callback | stream_stop, | |||
int | standby_message_timeout, | |||
char * | partial_suffix | |||
) |
Definition at line 503 of file receivelog.c.
References _, CheckServerVersionForStreaming(), close, current_walfile_name, error(), existsTimeLineHistoryFile(), HandleCopyStream(), NULL, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), progname, snprintf(), strerror(), walfile, and writeTimeLineHistoryFile().
Referenced by LogStreamerMain(), and StreamLog().
{ char query[128]; PGresult *res; XLogRecPtr stoppos; /* * The caller should've checked the server version already, but doesn't do * any harm to check it here too. */ if (!CheckServerVersionForStreaming(conn)) return false; if (sysidentifier != NULL) { /* Validate system identifier hasn't changed */ res = PQexec(conn, "IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn)); PQclear(res); return false; } if (PQnfields(res) != 3 || PQntuples(res) != 1) { fprintf(stderr, _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"), progname, PQntuples(res), PQnfields(res), 1, 3); PQclear(res); return false; } if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0) { fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), progname); PQclear(res); return false; } if (timeline > atoi(PQgetvalue(res, 0, 1))) { fprintf(stderr, _("%s: starting timeline %u is not present in the server\n"), progname, timeline); PQclear(res); return false; } PQclear(res); } while (1) { /* * Fetch the timeline history file for this timeline, if we don't * have it already. */ if (!existsTimeLineHistoryFile(basedir, timeline)) { snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_TUPLES_OK) { /* FIXME: we might send it ok, but get an error */ fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), progname, "TIMELINE_HISTORY", PQresultErrorMessage(res)); PQclear(res); return false; } /* * The response to TIMELINE_HISTORY is a single row result set * with two fields: filename and content */ if (PQnfields(res) != 2 || PQntuples(res) != 1) { fprintf(stderr, _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"), progname, PQntuples(res), PQnfields(res), 1, 2); } /* Write the history file to disk */ writeTimeLineHistoryFile(basedir, timeline, PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 1)); PQclear(res); } /* * Before we start streaming from the requested location, check * if the callback tells us to stop here. */ if (stream_stop(startpos, timeline, false)) return true; /* Initiate the replication stream at specified location */ snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u", (uint32) (startpos >> 32), (uint32) startpos, timeline); res = PQexec(conn, query); if (PQresultStatus(res) != PGRES_COPY_BOTH) { fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), progname, "START_REPLICATION", PQresultErrorMessage(res)); PQclear(res); return false; } PQclear(res); /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, &stoppos); if (res == NULL) goto error; /* * Streaming finished. * * There are two possible reasons for that: a controlled shutdown, * or we reached the end of the current timeline. In case of * end-of-timeline, the server sends a result set after Copy has * finished, containing the next timeline's ID. Read that, and * restart streaming from the next timeline. */ if (PQresultStatus(res) == PGRES_TUPLES_OK) { /* * End-of-timeline. Read the next timeline's ID. */ uint32 newtimeline; newtimeline = atoi(PQgetvalue(res, 0, 0)); PQclear(res); if (newtimeline <= timeline) { /* shouldn't happen */ fprintf(stderr, "server reported unexpected next timeline %u, following timeline %u\n", newtimeline, timeline); goto error; } /* Read the final result, which should be CommandComplete. */ res = PQgetResult(conn); if (PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, _("%s: unexpected termination of replication stream: %s"), progname, PQresultErrorMessage(res)); goto error; } PQclear(res); /* * Loop back to start streaming from the new timeline. * Always start streaming at the beginning of a segment. */ timeline = newtimeline; startpos = stoppos - (stoppos % XLOG_SEG_SIZE); continue; } else if (PQresultStatus(res) == PGRES_COMMAND_OK) { /* * End of replication (ie. controlled shut down of the server). * * Check if the callback thinks it's OK to stop here. If not, * complain. */ if (stream_stop(stoppos, timeline, false)) return true; else { fprintf(stderr, _("%s: replication stream was terminated before stop point\n"), progname); goto error; } } else { /* Server returned an error. */ fprintf(stderr, _("%s: unexpected termination of replication stream: %s"), progname, PQresultErrorMessage(res)); goto error; } } error: if (walfile != -1 && close(walfile) != 0) fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), progname, current_walfile_name, strerror(errno)); walfile = -1; return false; }