Header And Logo

PostgreSQL
| The world's most advanced open source database.

Typedefs | Functions

receivelog.h File Reference

#include "access/xlogdefs.h"
Include dependency graph for receivelog.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Typedefs

typedef bool(* stream_stop_callback )(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Functions

bool CheckServerVersionForStreaming (PGconn *conn)
bool ReceiveXlogStream (PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix)

Typedef Documentation

typedef bool(* stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)

Definition at line 7 of file receivelog.h.


Function Documentation

bool CheckServerVersionForStreaming ( PGconn conn  ) 

Definition at line 446 of file receivelog.c.

References _, PQparameterStatus(), PQserverVersion(), and progname.

Referenced by BaseBackup(), ReceiveXlogStream(), and StreamLog().

{
    int         minServerMajor,
                maxServerMajor;
    int         serverMajor;

    /*
     * The message format used in streaming replication changed in 9.3, so we
     * cannot stream from older servers. And we don't support servers newer
     * than the client; it might work, but we don't know, so err on the safe
     * side.
     */
    minServerMajor = 903;
    maxServerMajor = PG_VERSION_NUM / 100;
    serverMajor = PQserverVersion(conn) / 100;
    if (serverMajor < minServerMajor || serverMajor > maxServerMajor)
    {
        const char *serverver = PQparameterStatus(conn, "server_version");
        fprintf(stderr, _("%s: incompatible server version %s; streaming is only supported with server version %s\n"),
                progname,
                serverver ? serverver : "'unknown'",
                "9.3");
        return false;
    }
    return true;
}

bool ReceiveXlogStream ( PGconn conn,
XLogRecPtr  startpos,
uint32  timeline,
char *  sysidentifier,
char *  basedir,
stream_stop_callback  stream_stop,
int  standby_message_timeout,
char *  partial_suffix 
)

Definition at line 503 of file receivelog.c.

References _, CheckServerVersionForStreaming(), close, current_walfile_name, error(), existsTimeLineHistoryFile(), HandleCopyStream(), NULL, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), progname, snprintf(), strerror(), walfile, and writeTimeLineHistoryFile().

Referenced by LogStreamerMain(), and StreamLog().

{
    char        query[128];
    PGresult   *res;
    XLogRecPtr  stoppos;

    /*
     * The caller should've checked the server version already, but doesn't do
     * any harm to check it here too.
     */
    if (!CheckServerVersionForStreaming(conn))
        return false;

    if (sysidentifier != NULL)
    {
        /* Validate system identifier hasn't changed */
        res = PQexec(conn, "IDENTIFY_SYSTEM");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
            fprintf(stderr,
                    _("%s: could not send replication command \"%s\": %s"),
                    progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
            PQclear(res);
            return false;
        }
        if (PQnfields(res) != 3 || PQntuples(res) != 1)
        {
            fprintf(stderr,
                    _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
                    progname, PQntuples(res), PQnfields(res), 1, 3);
            PQclear(res);
            return false;
        }
        if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
        {
            fprintf(stderr,
                    _("%s: system identifier does not match between base backup and streaming connection\n"),
                    progname);
            PQclear(res);
            return false;
        }
        if (timeline > atoi(PQgetvalue(res, 0, 1)))
        {
            fprintf(stderr,
                    _("%s: starting timeline %u is not present in the server\n"),
                    progname, timeline);
            PQclear(res);
            return false;
        }
        PQclear(res);
    }

    while (1)
    {
        /*
         * Fetch the timeline history file for this timeline, if we don't
         * have it already.
         */
        if (!existsTimeLineHistoryFile(basedir, timeline))
        {
            snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline);
            res = PQexec(conn, query);
            if (PQresultStatus(res) != PGRES_TUPLES_OK)
            {
                /* FIXME: we might send it ok, but get an error */
                fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
                        progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));
                PQclear(res);
                return false;
            }

            /*
             * The response to TIMELINE_HISTORY is a single row result set
             * with two fields: filename and content
             */
            if (PQnfields(res) != 2 || PQntuples(res) != 1)
            {
                fprintf(stderr,
                        _("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields\n"),
                    progname, PQntuples(res), PQnfields(res), 1, 2);
            }

            /* Write the history file to disk */
            writeTimeLineHistoryFile(basedir, timeline,
                                     PQgetvalue(res, 0, 0),
                                     PQgetvalue(res, 0, 1));

            PQclear(res);
        }

        /*
         * Before we start streaming from the requested location, check
         * if the callback tells us to stop here.
         */
        if (stream_stop(startpos, timeline, false))
            return true;

        /* Initiate the replication stream at specified location */
        snprintf(query, sizeof(query), "START_REPLICATION %X/%X TIMELINE %u",
                 (uint32) (startpos >> 32), (uint32) startpos,
                 timeline);
        res = PQexec(conn, query);
        if (PQresultStatus(res) != PGRES_COPY_BOTH)
        {
            fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
                    progname, "START_REPLICATION", PQresultErrorMessage(res));
            PQclear(res);
            return false;
        }
        PQclear(res);

        /* Stream the WAL */
        res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
                               standby_message_timeout, partial_suffix,
                               &stoppos);
        if (res == NULL)
            goto error;

        /*
         * Streaming finished.
         *
         * There are two possible reasons for that: a controlled shutdown,
         * or we reached the end of the current timeline. In case of
         * end-of-timeline, the server sends a result set after Copy has
         * finished, containing the next timeline's ID. Read that, and
         * restart streaming from the next timeline.
         */

        if (PQresultStatus(res) == PGRES_TUPLES_OK)
        {
            /*
             * End-of-timeline. Read the next timeline's ID.
             */
            uint32      newtimeline;

            newtimeline = atoi(PQgetvalue(res, 0, 0));
            PQclear(res);

            if (newtimeline <= timeline)
            {
                /* shouldn't happen */
                fprintf(stderr,
                        "server reported unexpected next timeline %u, following timeline %u\n",
                        newtimeline, timeline);
                goto error;
            }

            /* Read the final result, which should be CommandComplete. */
            res = PQgetResult(conn);
            if (PQresultStatus(res) != PGRES_COMMAND_OK)
            {
                fprintf(stderr,
                        _("%s: unexpected termination of replication stream: %s"),
                        progname, PQresultErrorMessage(res));
                goto error;
            }
            PQclear(res);

            /*
             * Loop back to start streaming from the new timeline.
             * Always start streaming at the beginning of a segment.
             */
            timeline = newtimeline;
            startpos = stoppos - (stoppos % XLOG_SEG_SIZE);
            continue;
        }
        else if (PQresultStatus(res) == PGRES_COMMAND_OK)
        {
            /*
             * End of replication (ie. controlled shut down of the server).
             *
             * Check if the callback thinks it's OK to stop here. If not,
             * complain.
             */
            if (stream_stop(stoppos, timeline, false))
                return true;
            else
            {
                fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),
                        progname);
                goto error;
            }
        }
        else
        {
            /* Server returned an error. */
            fprintf(stderr,
                    _("%s: unexpected termination of replication stream: %s"),
                    progname, PQresultErrorMessage(res));
            goto error;
        }
    }

error:
    if (walfile != -1 && close(walfile) != 0)
        fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
                progname, current_walfile_name, strerror(errno));
    walfile = -1;
    return false;
}